You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/13 16:21:04 UTC
[16/17] git commit: New RollbackAction and exception handling changes.
New RollbackAction and exception handling changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8c4d8ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8c4d8ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8c4d8ddf
Branch: refs/pull/52/head
Commit: 8c4d8ddf7a8dc6debcc4db03271550122d24ead4
Parents: f7a5072
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Mar 13 10:35:25 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Mar 13 10:35:25 2014 -0400
----------------------------------------------------------------------
.../exception/CollectionRuntimeException.java | 40 ++++--
.../exception/WriteCommitException.java | 22 +--
.../WriteOptimisticVerifyException.java | 24 ++--
.../exception/WriteStartException.java | 21 +--
.../exception/WriteUniqueVerifyException.java | 22 +--
.../collection/guice/CollectionModule.java | 3 +
.../impl/EntityCollectionManagerImpl.java | 23 +--
.../mvcc/stage/delete/MarkCommit.java | 3 +-
.../collection/mvcc/stage/delete/MarkStart.java | 6 +-
.../mvcc/stage/write/RollbackAction.java | 142 +++++++++++++++++++
.../mvcc/stage/write/WriteCommit.java | 24 ++--
.../mvcc/stage/write/WriteOptimisticVerify.java | 92 +-----------
.../collection/mvcc/stage/write/WriteStart.java | 3 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 9 +-
.../MvccEntitySerializationStrategyImpl.java | 4 +-
.../stage/write/WriteOptimisticVerifyTest.java | 6 +
.../mvcc/stage/write/WriteUniqueVerifyIT.java | 2 -
17 files changed, 274 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
index 468a4cb..d8b4e53 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
@@ -17,31 +17,53 @@
*/
package org.apache.usergrid.persistence.collection.exception;
-/**
- * @author tnine
- */
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
public class CollectionRuntimeException extends RuntimeException {
- public CollectionRuntimeException() {
+
+ private Entity entity;
+ private CollectionScope collectionScope;
+
+
+ public CollectionRuntimeException( Entity entity, CollectionScope scope ) {
+ this.entity = entity;
+ this.collectionScope = scope;
}
- public CollectionRuntimeException( final String message ) {
+ public CollectionRuntimeException( Entity entity, CollectionScope scope, final String message ) {
super( message );
}
- public CollectionRuntimeException( final String message, final Throwable cause ) {
+ public CollectionRuntimeException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
super( message, cause );
}
- public CollectionRuntimeException( final Throwable cause ) {
+ public CollectionRuntimeException( Entity entity, CollectionScope scope, final Throwable cause ) {
super( cause );
}
- public CollectionRuntimeException( final String message, final Throwable cause, final boolean enableSuppression,
- final boolean writableStackTrace ) {
+ public CollectionRuntimeException( Entity entity, CollectionScope scope,
+ final String message, final Throwable cause, final boolean enableSuppression,
+ final boolean writableStackTrace ) {
super( message, cause, enableSuppression, writableStackTrace );
}
+
+
+ public CollectionScope getCollectionScope() {
+ return collectionScope;
+ }
+
+ /**
+ * Entity involved in operation.
+ * @return Entity or null if entity not instantiated yet in operation.
+ */
+ public Entity getEntity() {
+ return entity;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
index 14c1615..e3f753e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
@@ -17,28 +17,28 @@
*/
package org.apache.usergrid.persistence.collection.exception;
-public class WriteCommitException extends CollectionRuntimeException {
- public WriteCommitException() {
- }
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+public class WriteCommitException extends CollectionRuntimeException {
- public WriteCommitException( final String message ) {
- super( message );
+ public WriteCommitException( Entity entity, CollectionScope scope, final String message ) {
+ super( entity, scope, message );
}
- public WriteCommitException( final String message, final Throwable cause ) {
- super( message, cause );
+ public WriteCommitException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+ super( entity, scope, message, cause );
}
- public WriteCommitException( final Throwable cause ) {
- super( cause );
+ public WriteCommitException( Entity entity, CollectionScope scope, final Throwable cause ) {
+ super( entity, scope, cause );
}
- public WriteCommitException( final String message, final Throwable cause, final boolean enableSuppression,
+ public WriteCommitException( Entity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression,
final boolean writableStackTrace ) {
- super( message, cause, enableSuppression, writableStackTrace );
+ super( entity, scope, message, cause, enableSuppression, writableStackTrace );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
index b8b8f4f..7895cb9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
@@ -17,28 +17,30 @@
*/
package org.apache.usergrid.persistence.collection.exception;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
public class WriteOptimisticVerifyException extends CollectionRuntimeException {
- public WriteOptimisticVerifyException() {
- }
- public WriteOptimisticVerifyException( final String message ) {
- super( message );
+ public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final String message ) {
+ super( entity, scope, message );
}
- public WriteOptimisticVerifyException( final String message, final Throwable cause ) {
- super( message, cause );
+ public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+ super( entity, scope, message, cause );
}
- public WriteOptimisticVerifyException( final Throwable cause ) {
- super( cause );
+ public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final Throwable cause ) {
+ super( entity, scope, cause );
}
- public WriteOptimisticVerifyException( final String message, final Throwable cause, final boolean enableSuppression,
- final boolean writableStackTrace ) {
- super( message, cause, enableSuppression, writableStackTrace );
+ public WriteOptimisticVerifyException( Entity entity, CollectionScope scope,
+ final String message, final Throwable cause, final boolean enableSuppression,
+ final boolean writableStackTrace ) {
+ super( entity, scope, message, cause, enableSuppression, writableStackTrace );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
index a4a5f9e..f422ebd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
@@ -17,28 +17,29 @@
*/
package org.apache.usergrid.persistence.collection.exception;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
public class WriteStartException extends CollectionRuntimeException {
- public WriteStartException() {
- }
- public WriteStartException( final String message ) {
- super( message );
+ public WriteStartException( Entity entity, CollectionScope scope, final String message ) {
+ super( entity, scope, message );
}
- public WriteStartException( final String message, final Throwable cause ) {
- super( message, cause );
+ public WriteStartException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+ super( entity, scope, message, cause );
}
- public WriteStartException( final Throwable cause ) {
- super( cause );
+ public WriteStartException( Entity entity, CollectionScope scope, final Throwable cause ) {
+ super( entity, scope, cause );
}
- public WriteStartException( final String message, final Throwable cause, final boolean enableSuppression,
+ public WriteStartException( Entity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression,
final boolean writableStackTrace ) {
- super( message, cause, enableSuppression, writableStackTrace );
+ super( entity, scope, message, cause, enableSuppression, writableStackTrace );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
index 66b3076..503bf4f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.collection.exception;
import java.util.Map;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.field.Field;
@@ -28,25 +30,27 @@ public class WriteUniqueVerifyException extends CollectionRuntimeException {
private Map<String, Field> violations;
- public WriteUniqueVerifyException( Map<String, Field> violations ) {
- super( "Error: one or more duplicate fields detected");
+ public WriteUniqueVerifyException( Entity entity, CollectionScope scope, Map<String, Field> violations ) {
+ super( entity, scope, "Error: one or more duplicate fields detected");
this.violations = violations;
}
- public WriteUniqueVerifyException( final String message, final Throwable cause ) {
- super( message, cause );
+ public WriteUniqueVerifyException( Entity entity, CollectionScope scope,
+ final String message, final Throwable cause ) {
+ super( entity, scope, message, cause );
}
- public WriteUniqueVerifyException( final Throwable cause ) {
- super( cause );
+ public WriteUniqueVerifyException( Entity entity, CollectionScope scope, final Throwable cause ) {
+ super( entity, scope, cause );
}
- public WriteUniqueVerifyException( final String message, final Throwable cause, final boolean enableSuppression,
- final boolean writableStackTrace ) {
- super( message, cause, enableSuppression, writableStackTrace );
+ public WriteUniqueVerifyException( Entity entity, CollectionScope scope,
+ final String message, final Throwable cause, final boolean enableSuppression,
+ final boolean writableStackTrace ) {
+ super( entity, scope, message, cause, enableSuppression, writableStackTrace );
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 2837255..a954a78 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSy
import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
@@ -71,5 +72,7 @@ public class CollectionModule extends AbstractModule {
.build( EntityCollectionManagerFactory.class ) );
bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
+
+ bind( RollbackAction.class );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 2cec4b1..9685114 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.load.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
@@ -56,7 +57,7 @@ import rx.util.functions.FuncN;
*/
public class EntityCollectionManagerImpl implements EntityCollectionManager {
- private static final Logger logger = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
+ private static final Logger log = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
private final CollectionScope collectionScope;
private final UUIDService uuidService;
@@ -68,6 +69,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final WriteUniqueVerify writeVerifyUnique;
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
+ private final RollbackAction rollback;
//load stages
private final Load load;
@@ -86,6 +88,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final WriteUniqueVerify writeVerifyUnique,
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit,
+ final RollbackAction rollback,
final Load load,
final MarkStart markStart,
final MarkCommit markCommit,
@@ -98,6 +101,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.writeVerifyUnique = writeVerifyUnique;
this.writeOptimisticVerify = writeOptimisticVerify;
this.writeCommit = writeCommit;
+ this.rollback = rollback;
this.load = load;
this.markStart = markStart;
this.markCommit = markCommit;
@@ -141,7 +145,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
Observable<CollectionIoEvent<MvccEntity>> observable =
- Observable.from( writeData ).subscribeOn( scheduler ).map( writeStart ).flatMap(
+ Observable.from( writeData )
+ .subscribeOn( scheduler )
+ .map( writeStart )
+ .flatMap(
new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
@Override
@@ -156,7 +163,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// WriteVerifyUnique stage to execute multiple verification steps in
// parallel and zip the results
-
Observable<CollectionIoEvent<MvccEntity>> unique =
Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
.flatMap( writeVerifyUnique);
@@ -167,13 +173,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
.map( writeOptimisticVerify );
-
// zip the results
// TODO: Should the zip only return errors here, and if errors are present,
// we throw during the zip phase? I couldn't find "
return Observable.zip( unique, optimistic, new Func2<CollectionIoEvent<MvccEntity>,
CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>>() {
+
@Override
public CollectionIoEvent<MvccEntity> call(
final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent,
@@ -181,18 +187,17 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
return mvccEntityCollectionIoEvent;
}
- } );
+ });
}
- } );
-
+ });
// execute all validation stages concurrently. Needs refactored when this is done.
// https://github.com/Netflix/RxJava/issues/627
// observable = Concurrent.concurrent( observable, scheduler, new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
- //return the commit result.
- return observable.map( writeCommit );
+ // return the commit result.
+ return observable.map(writeCommit); // TODO: .doOnError( rollback );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..e5e1eef 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -101,7 +101,8 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
}
catch ( ConnectionException e ) {
LOG.error( "Failed to execute write asynchronously ", e );
- throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( entity.getEntity().get(), collectionScope,
+ "Failed to execute write asynchronously ", e );
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..fe61469 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -99,12 +99,14 @@ public class MarkStart implements Func1<CollectionIoEvent<Id>, CollectionIoEvent
}
catch ( ConnectionException e ) {
LOG.error( "Failed to execute write asynchronously ", e );
- throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( null, collectionScope,
+ "Failed to execute write asynchronously ", e );
}
//create the mvcc entity for the next stage
- final MvccEntityImpl nextStage = new MvccEntityImpl(entityId, version, MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+ final MvccEntityImpl nextStage = new MvccEntityImpl(
+ entityId, version, MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
return new CollectionIoEvent<MvccEntity>( collectionScope, nextStage );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
new file mode 100644
index 0000000..3761741
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
+import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
+
+
+/**
+ * Reverses any changes made on behalf of the specified entity. When an exception is thrown
+ * during a write operation this action is called to rollback any changes made.
+ */
+public class RollbackAction implements Action1<Throwable> {
+
+ private static final Logger log = LoggerFactory.getLogger(RollbackAction.class);
+
+ private final Scheduler scheduler;
+ private final UniqueValueSerializationStrategy uniqueValueStrat;
+
+
+ public RollbackAction() {
+ Injector injector = Guice.createInjector( new CollectionModule() );
+ scheduler = injector.getInstance( Scheduler.class );
+ uniqueValueStrat = injector.getInstance( UniqueValueSerializationStrategy.class );
+ }
+
+
+ public void call( final Throwable t ) {
+
+ if ( t instanceof CollectionRuntimeException ) {
+
+ CollectionRuntimeException cre = (CollectionRuntimeException)t;
+ final Entity entity = cre.getEntity();
+ final CollectionScope scope = cre.getCollectionScope();
+
+ // Delete all unique values of entity, and do it concurrently
+ List<Observable<FieldDeleteResult>> results
+ = new ArrayList<Observable<FieldDeleteResult>>();
+
+ int uniqueFieldCount = 0;
+ for (final Field field : entity.getFields()) {
+
+ // if it's unique, create a function to delete it
+ if (field.isUnique()) {
+
+ uniqueFieldCount++;
+
+ Observable<FieldDeleteResult> result = Observable.from(field)
+ .subscribeOn(scheduler).map(new Func1<Field, FieldDeleteResult>() {
+
+ @Override
+ public FieldDeleteResult call(Field field) {
+ UniqueValue toDelete = new UniqueValueImpl(
+ scope, field, entity.getId(), entity.getVersion());
+
+ MutationBatch mb = uniqueValueStrat.delete(toDelete);
+ try {
+ mb.execute();
+ } catch (ConnectionException ex) {
+ throw new WriteUniqueVerifyException(
+ entity, scope, "Rollback error deleting unique value " + field.toString(), ex);
+ }
+ return new FieldDeleteResult(field.getName());
+ }
+ });
+
+ results.add(result);
+ }
+ }
+
+ if (uniqueFieldCount > 0) {
+
+ final FuncN<Boolean> zipFunction = new FuncN<Boolean>() {
+
+ @Override
+ public Boolean call(final Object... args) {
+ for (Object resultObj : args) {
+
+ FieldDeleteResult result = (FieldDeleteResult) resultObj;
+ log.debug("Rollback deleted unique value from entity: {} version: {} name: {}",
+ new String[]{
+ entity.getId().toString(),
+ entity.getVersion().toString(),
+ result.getName()
+ });
+ }
+ return true;
+ }
+ };
+
+ // "zip" up the concurrent results
+ Observable.zip(results, zipFunction).toBlockingObservable().last();
+ }
+ }
+ }
+
+ class FieldDeleteResult {
+
+ private final String name;
+
+ public FieldDeleteResult(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 32bc540..0f34d37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -80,32 +80,27 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
@Override
public Entity call( final CollectionIoEvent<MvccEntity> ioEvent ) {
- final MvccEntity entity = ioEvent.getEvent();
-
-
- ValidationUtils.verifyMvccEntityWithEntity( entity );
-
- final Id entityId = entity.getId();
- final UUID version = entity.getVersion();
+ final MvccEntity mvccEntity = ioEvent.getEvent();
+ ValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
+ final Id entityId = mvccEntity.getId();
+ final UUID version = mvccEntity.getVersion();
final CollectionScope collectionScope = ioEvent.getEntityCollection();
-
final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
-
MutationBatch logMutation = logEntryStrat.write( collectionScope, startEntry );
// now get our actual insert into the entity data
- MutationBatch entityMutation = entityStrat.write( collectionScope, entity );
+ MutationBatch entityMutation = entityStrat.write( collectionScope, mvccEntity );
// merge the 2 into 1 mutation
logMutation.mergeShallow( entityMutation );
// re-write the unique values but this time with no TTL
- for ( Field field : entity.getEntity().get().getFields() ) {
+ for ( Field field : mvccEntity.getEntity().get().getFields() ) {
if ( field.isUnique() ) {
UniqueValue written = new UniqueValueImpl(
- ioEvent.getEntityCollection(), field, entity.getId(), entity.getVersion());
+ ioEvent.getEntityCollection(), field, mvccEntity.getId(), mvccEntity.getVersion());
MutationBatch mb = uniqueValueStrat.write( written );
// merge into our existing mutation batch
@@ -119,9 +114,10 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
}
catch ( ConnectionException e ) {
LOG.error( "Failed to execute write asynchronously ", e );
- throw new WriteCommitException( "Failed to execute write asynchronously ", e );
+ throw new WriteCommitException( mvccEntity.getEntity().get(), collectionScope,
+ "Failed to execute write asynchronously ", e );
}
- return entity.getEntity().get();
+ return mvccEntity.getEntity().get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index 9b04bd1..3bc096d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -20,29 +20,22 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
-import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
import rx.Scheduler;
import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
/**
@@ -96,96 +89,19 @@ public class WriteOptimisticVerify
log.debug("Conflict writing entity id {} version {}",
entity.getId().toString(), entity.getVersion().toString());
- // We're not the first writer, cleanup and throw exception
-
logEntryStrat.delete( collectionScope, entity.getId(), entity.getVersion() );
-
- // Delete all unique values of entity, and do it concurrently
-
- List<Observable<FieldDeleteResult>> results =
- new ArrayList<Observable<FieldDeleteResult>>();
-
- int uniqueFieldCount = 0;
- for ( final Field field : entity.getFields() ) {
-
- // if it's unique, create a function to delete it
- if ( field.isUnique() ) {
-
- uniqueFieldCount++;
-
- Observable<FieldDeleteResult> result = Observable.from( field )
- .subscribeOn( scheduler ).map(new Func1<Field, FieldDeleteResult>() {
-
- @Override
- public FieldDeleteResult call(Field field ) {
-
- UniqueValue toDelete = new UniqueValueImpl(
- ioevent.getEntityCollection(), field,
- entity.getId(), entity.getVersion() );
-
- MutationBatch mb = uniqueValueStrat.delete( toDelete );
- try {
- mb.execute();
- }
- catch ( ConnectionException ex ) {
- throw new WriteUniqueVerifyException(
- "Error deleting unique value " + field.toString(), ex );
- }
- return new FieldDeleteResult( field.getName() );
- }
- });
-
- results.add( result );
- }
- }
-
- if ( uniqueFieldCount > 0 ) {
-
- final FuncN<Boolean> zipFunction = new FuncN<Boolean>() {
-
- @Override
- public Boolean call( final Object... args ) {
- for ( Object resultObj : args ) {
-
- FieldDeleteResult result = ( FieldDeleteResult ) resultObj;
- log.debug("Rollback deleted field from entity: {} version: {} name: {}",
- new String[] {
- entity.getId().toString(),
- entity.getVersion().toString(),
- result.getName()
- });
- }
- return true;
- }
- };
-
- // "zip" up the concurrent results
- Observable.zip( results, zipFunction ).toBlockingObservable().last();
- }
-
- throw new WriteOptimisticVerifyException("Change conflict, not first writer");
+ throw new WriteOptimisticVerifyException( entity, collectionScope,
+ "Change conflict, not first writer");
}
-
} catch ( ConnectionException e ) {
log.error( "Error reading entity log", e );
- throw new CollectionRuntimeException( "Error reading entity log", e );
+ throw new CollectionRuntimeException( entity, collectionScope,
+ "Error reading entity log", e );
}
// No op, just emit the value
return ioevent;
}
-
- class FieldDeleteResult {
- private final String name;
-
- public FieldDeleteResult( String name ) {
- this.name = name;
- }
-
- public String getName() {
- return this.name;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index c603f57..93b2da2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -73,7 +73,8 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
}
catch ( ConnectionException e ) {
LOG.error( "Failed to execute write ", e );
- throw new WriteStartException( "Failed to execute write ", e );
+ throw new WriteStartException( entity, collectionScope,
+ "Failed to execute write ", e );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index ca99c0c..26af8cd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -122,7 +122,8 @@ public class WriteUniqueVerify implements
}
catch ( ConnectionException ex ) {
throw new WriteUniqueVerifyException(
- "Error writing unique value " + field.toString(), ex );
+ entity, ioevent.getEntityCollection(),
+ "Error writing unique value " + field.toString(), ex );
}
// does the database value match what we wrote?
@@ -131,7 +132,8 @@ public class WriteUniqueVerify implements
loaded = uniqueValueStrat.load( ioevent.getEntityCollection(), field );
}
catch ( ConnectionException ex ) {
- throw new WriteUniqueVerifyException( "Error verifying write", ex );
+ throw new WriteUniqueVerifyException( entity, ioevent.getEntityCollection(),
+ "Error verifying write", ex );
}
return new FieldUniquenessResult( field, loaded.equals( written ) );
@@ -167,7 +169,8 @@ public class WriteUniqueVerify implements
}
if ( !uniquenessVioloations.isEmpty() ) {
- throw new WriteUniqueVerifyException( uniquenessVioloations );
+ throw new WriteUniqueVerifyException(
+ entity, ioevent.getEntityCollection(), uniquenessVioloations );
}
//return the original event
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index ebb6edb..2af030f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -128,7 +128,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
return null;
}
catch ( ConnectionException e ) {
- throw new CollectionRuntimeException( "An error occurred connecting to cassandra", e );
+ throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", e );
}
@@ -153,7 +153,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
.withColumnRange( version, null, false, maxSize ).execute().getResult();
}
catch ( ConnectionException e ) {
- throw new CollectionRuntimeException( "An error occurred connecting to cassandra", e );
+ throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", e );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index e9b2c7f..b396909 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
@@ -46,12 +47,16 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Scheduler;
@UseModules( TestCollectionModule.class )
public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
+ private static final Logger log = LoggerFactory.getLogger(WriteOptimisticVerifyTest.class);
+
@Override
protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
@@ -168,6 +173,7 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
newStage.call( new CollectionIoEvent<MvccEntity>(scope, mvccEntity));
} catch (WriteOptimisticVerifyException e) {
+ log.info("Error", e);
conflictDetected = true;
}
assertTrue( conflictDetected );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 3deb235..0793ac0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@ -89,9 +89,7 @@ public class WriteUniqueVerifyIT {
// verify two unique value violations
assertEquals( 2, e.getVioliations().size() );
}
-
}
-
}