You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:36 UTC

[36/50] incubator-usergrid git commit: add replay strategy

add replay strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/117c0f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/117c0f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/117c0f3d

Branch: refs/heads/USERGRID-460
Commit: 117c0f3d8839cb1da2cae35b9a1171c1b404ac98
Parents: 4414838
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:10:55 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:10:55 2015 -0600

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteUniqueVerify.java     | 94 +++++++++-----------
 1 file changed, 42 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117c0f3d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 5f96b92..173efa7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -101,9 +101,46 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final CollectionScope scope = ioevent.getEntityCollection();
 
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+        //allocate our max size, worst case
+        final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
+
+        //
+        // Construct all the functions for verifying we're unique
+        //
+        for ( final Field field :  entity.getFields() ) {
+
+            // if it's unique, create a function to validate it and add it to the list of
+            // concurrent validations
+            if ( field.isUnique() ) {
+                // use write-first then read strategy
+                final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
+
+                // use TTL in case something goes wrong before entity is finally committed
+                final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+
+                batch.mergeShallow( mb );
+                uniqueFields.add(field);
+            }
+        }
+
+        //short circuit nothing to do
+        if ( uniqueFields.size() == 0 ) {
+            return  ;
+        }
+
+        //perform the write
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException ex ) {
+            throw new RuntimeException( "Unable to write to cassandra", ex );
+        }
+
         // use simple thread pool to verify fields in parallel
-        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
+        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, uniqueFields,entity);
         Map<String,Field>  uniquenessViolations = cmd.execute();
+         cmd.getFailedExecutionException();
         //We have violations, throw an exception
         if ( !uniquenessViolations.isEmpty() ) {
             throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
@@ -113,19 +150,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
 
         private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-        private final Keyspace keySpace;
-        private final SerializationFig serializationFig;
         private final CassandraConfig fig;
         private final CollectionScope scope;
+        private final List<Field> uniqueFields;
         private final Entity entity;
 
-        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity entity){
+        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy, CassandraConfig fig, CollectionScope scope, List<Field> uniqueFields, Entity entity){
             super(REPLAY_GROUP);
             this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
-            this.keySpace = keySpace;
-            this.serializationFig = serializationFig;
             this.fig = fig;
             this.scope = scope;
+            this.uniqueFields = uniqueFields;
             this.entity = entity;
         }
 
@@ -142,23 +177,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
             Collection<Field> entityFields = entity.getFields();
             //allocate our max size, worst case
-            final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size() );
             //now get the set of fields back
             final UniqueValueSet uniqueValues;
-            //todo add consistencylevel and read back if fail using
-
             try {
-
-                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
+                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, entityFields );
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to read from cassandra", e );
             }
 
-
             final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
 
-
             //loop through each field that was unique
             for ( final Field field : uniqueFields ) {
 
@@ -170,51 +199,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
                             field.getName() ) );
                 }
 
-
                 final Id returnedEntityId = uniqueValue.getEntityId();
 
-
                 if ( !entity.getId().equals(returnedEntityId) ) {
                     uniquenessViolations.put( field.getName(), field );
                 }
             }
-            final MutationBatch batch = keySpace.prepareMutationBatch();
-            //
-            // Construct all the functions for verifying we're unique
-            //
-            for ( final Field field :  entity.getFields() ) {
-
-                // if it's unique, create a function to validate it and add it to the list of
-                // concurrent validations
-                if ( field.isUnique() ) {
-
-
-                    // use write-first then read strategy
-                    final UniqueValue written = new UniqueValueImpl( field, entity.getId(), entity.getVersion() );
-
-                    // use TTL in case something goes wrong before entity is finally committed
-                    final MutationBatch mb = uniqueValueSerializationStrategy.write( scope, written, serializationFig.getTimeout() );
-
-                    batch.mergeShallow( mb );
-
-
-                    uniqueFields.add(field);
-                }
-            }
-
-            //short circuit nothing to do
-            if ( uniqueFields.size() == 0 ) {
-                return uniquenessViolations ;
-            }
-
-
-            //perform the write
-            try {
-                batch.execute();
-            }
-            catch ( ConnectionException ex ) {
-                throw new RuntimeException( "Unable to write to cassandra", ex );
-            }
 
             return uniquenessViolations;
         }