You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:36 UTC
[36/50] incubator-usergrid git commit: add replay strategy
add replay strategy
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/117c0f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/117c0f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/117c0f3d
Branch: refs/heads/USERGRID-460
Commit: 117c0f3d8839cb1da2cae35b9a1171c1b404ac98
Parents: 4414838
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:10:55 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:10:55 2015 -0600
----------------------------------------------------------------------
.../mvcc/stage/write/WriteUniqueVerify.java | 94 +++++++++-----------
1 file changed, 42 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117c0f3d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 5f96b92..173efa7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -101,9 +101,46 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
final CollectionScope scope = ioevent.getEntityCollection();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+ //allocate our max size, worst case
+ final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
+
+ //
+ // Construct all the functions for verifying we're unique
+ //
+ for ( final Field field : entity.getFields() ) {
+
+ // if it's unique, create a function to validate it and add it to the list of
+ // concurrent validations
+ if ( field.isUnique() ) {
+ // use write-first then read strategy
+ final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
+
+ // use TTL in case something goes wrong before entity is finally committed
+ final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+
+ batch.mergeShallow( mb );
+ uniqueFields.add(field);
+ }
+ }
+
+ //short circuit nothing to do
+ if ( uniqueFields.size() == 0 ) {
+ return ;
+ }
+
+ //perform the write
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException ex ) {
+ throw new RuntimeException( "Unable to write to cassandra", ex );
+ }
+
// use simple thread pool to verify fields in parallel
- ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
+ ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, uniqueFields,entity);
Map<String,Field> uniquenessViolations = cmd.execute();
+ cmd.getFailedExecutionException();
//We have violations, throw an exception
if ( !uniquenessViolations.isEmpty() ) {
throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
@@ -113,19 +150,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
- private final Keyspace keySpace;
- private final SerializationFig serializationFig;
private final CassandraConfig fig;
private final CollectionScope scope;
+ private final List<Field> uniqueFields;
private final Entity entity;
- public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity entity){
+ public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy, CassandraConfig fig, CollectionScope scope, List<Field> uniqueFields, Entity entity){
super(REPLAY_GROUP);
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
- this.keySpace = keySpace;
- this.serializationFig = serializationFig;
this.fig = fig;
this.scope = scope;
+ this.uniqueFields = uniqueFields;
this.entity = entity;
}
@@ -142,23 +177,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
Collection<Field> entityFields = entity.getFields();
//allocate our max size, worst case
- final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size() );
//now get the set of fields back
final UniqueValueSet uniqueValues;
- //todo add consistencylevel and read back if fail using
-
try {
-
- uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
+ uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, entityFields );
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to read from cassandra", e );
}
-
final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
-
//loop through each field that was unique
for ( final Field field : uniqueFields ) {
@@ -170,51 +199,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
field.getName() ) );
}
-
final Id returnedEntityId = uniqueValue.getEntityId();
-
if ( !entity.getId().equals(returnedEntityId) ) {
uniquenessViolations.put( field.getName(), field );
}
}
- final MutationBatch batch = keySpace.prepareMutationBatch();
- //
- // Construct all the functions for verifying we're unique
- //
- for ( final Field field : entity.getFields() ) {
-
- // if it's unique, create a function to validate it and add it to the list of
- // concurrent validations
- if ( field.isUnique() ) {
-
-
- // use write-first then read strategy
- final UniqueValue written = new UniqueValueImpl( field, entity.getId(), entity.getVersion() );
-
- // use TTL in case something goes wrong before entity is finally committed
- final MutationBatch mb = uniqueValueSerializationStrategy.write( scope, written, serializationFig.getTimeout() );
-
- batch.mergeShallow( mb );
-
-
- uniqueFields.add(field);
- }
- }
-
- //short circuit nothing to do
- if ( uniqueFields.size() == 0 ) {
- return uniquenessViolations ;
- }
-
-
- //perform the write
- try {
- batch.execute();
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException( "Unable to write to cassandra", ex );
- }
return uniquenessViolations;
}