You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/15 22:34:50 UTC

[01/11] incubator-usergrid git commit: change consistency level

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-466 2d6ae3698 -> 03341191a


change consistency level


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9e6e36dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9e6e36dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9e6e36dd

Branch: refs/heads/USERGRID-466
Commit: 9e6e36ddb5bd1d38622b100b4916cac6dcbc1d77
Parents: e8dc17d
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 12 17:29:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 12 17:29:48 2015 -0600

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteUniqueVerify.java     |  3 +-
 .../UniqueValueSerializationStrategy.java       | 20 ++++++++++----
 .../UniqueValueSerializationStrategyImpl.java   | 19 ++++++++-----
 .../core/astyanax/CassandraConfig.java          |  6 ++++
 .../core/astyanax/CassandraConfigImpl.java      |  8 ++++--
 .../persistence/core/astyanax/CassandraFig.java |  9 ++++--
 .../core/astyanax/ColumnNameIteratorTest.java   |  5 ++++
 .../MultiKeyColumnNameIteratorTest.java         |  5 ++++
 .../astyanax/MultiRowColumnIteratorTest.java    |  5 ++++
 .../index/impl/EsEntityIndexImpl.java           | 29 +++++++++++---------
 .../persistence/queue/QueueManagerTest.java     |  2 +-
 11 files changed, 80 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 162bcac..1c30e75 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,7 +111,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         //
         for ( final Field field : entityFields ) {
 
-            // if it's unique, create a function to validate it and add it to the list of 
+            // if it's unique, create a function to validate it and add it to the list of
             // concurrent validations
             if ( field.isUnique() ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 030d9d1..8d314d8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.ConsistencyLevel;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -34,7 +35,7 @@ public interface UniqueValueSerializationStrategy extends Migration {
 
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
+     *
      * @param uniqueValue Object to be written
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
@@ -42,16 +43,16 @@ public interface UniqueValueSerializationStrategy extends Migration {
 
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
+     *
      * @param uniqueValue Object to be written
-     * @param timeToLive How long object should live in seconds 
+     * @param timeToLive How long object should live in seconds
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
     public MutationBatch write( CollectionScope scope,  UniqueValue uniqueValue, Integer timeToLive );
 
     /**
      * Load UniqueValue that matches field from collection or null if that value does not exist.
-     * 
+     *
      * @param colScope Collection scope in which to look for field name/value
      * @param fields Field name/value to search for
      * @return UniqueValueSet containing fields from the collection that exist in cassandra
@@ -60,8 +61,17 @@ public interface UniqueValueSerializationStrategy extends Migration {
     public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
 
     /**
+     * Load UniqueValue that matches field from collection or null if that value does not exist.
+     *
+     * @param colScope Collection scope in which to look for field name/value
+     * @param fields Field name/value to search for
+     * @return UniqueValueSet containing fields from the collection that exist in cassandra
+     * @throws ConnectionException on error connecting to Cassandra
+     */
+    public UniqueValueSet load( CollectionScope colScope, ConsistencyLevel consistencyLevel, Collection<Field> fields ) throws ConnectionException;
+    /**
      * Delete the specified Unique Value from Cassandra.
-     * 
+     *
      * @param uniqueValue Object to be deleted.
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index be95b08..6483408 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -25,6 +25,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
+import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.usergrid.persistence.core.astyanax.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,10 +36,6 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -72,6 +70,7 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
                     ENTITY_VERSION_SER );
 
     protected final Keyspace keyspace;
+    private final CassandraFig cassandraFig;
 
 
     /**
@@ -80,7 +79,8 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
      * @param keyspace Keyspace in which to store Unique Values.
      */
     @Inject
-    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
+    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig) {
+        this.cassandraFig = cassandraFig;
         this.keyspace = keyspace;
     }
 
@@ -171,8 +171,13 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
     }
 
 
+
+    @Override
+    public UniqueValueSet load(final CollectionScope colScope, final Collection<Field> fields ) throws ConnectionException{
+        return load(colScope,ConsistencyLevel.valueOf(cassandraFig.getReadCL()), fields);
+    }
     @Override
-    public UniqueValueSet load(final CollectionScope colScope, final Collection<Field> fields )
+    public UniqueValueSet load(final CollectionScope colScope, final ConsistencyLevel consistencyLevel, final Collection<Field> fields )
             throws ConnectionException {
 
         Preconditions.checkNotNull( fields, "fields are required" );
@@ -198,7 +203,7 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
         final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
 
         Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>> results =
-                keyspace.prepareQuery( CF_UNIQUE_VALUES ).getKeySlice( keys )
+                keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(consistencyLevel).getKeySlice( keys )
                         .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index ad4463a..817aee2 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -37,6 +37,12 @@ public interface CassandraConfig {
     public ConsistencyLevel getReadCL();
 
     /**
+     * Get the currently configured ReadCL that is more consitent than getReadCL
+     * @return
+     */
+    public ConsistencyLevel getConsistentReadCL();
+
+    /**
      * Get the currently configured write CL
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
index ce80ea2..17b91c6 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
@@ -39,6 +39,7 @@ public class CassandraConfigImpl implements CassandraConfig {
     private ConsistencyLevel readCl;
     private ConsistencyLevel writeCl;
     private int[] shardSettings;
+    private ConsistencyLevel consistentCl;
 
 
     @Inject
@@ -50,7 +51,7 @@ public class CassandraConfigImpl implements CassandraConfig {
 
         this.shardSettings = parseShardSettings( cassandraFig.getShardValues() );
 
-
+        this.consistentCl = ConsistencyLevel.valueOf(cassandraFig.getConsistentReadCL());
 
         //add the listeners to update the values
         cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
@@ -78,7 +79,10 @@ public class CassandraConfigImpl implements CassandraConfig {
         return readCl;
     }
 
-
+    @Override
+    public ConsistencyLevel getConsistentReadCL() {
+        return consistentCl;
+    }
     @Override
     public ConsistencyLevel getWriteCL() {
         return writeCl;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 1208a1a..66a716c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -33,6 +33,8 @@ import org.safehaus.guicyfig.Key;
 public interface CassandraFig extends GuicyFig {
 
 
+    public static final String READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
+
     public static final String READ_CL = "usergrid.read.cl";
 
     public static final String WRITE_CL = "usergrid.write.cl";
@@ -77,11 +79,14 @@ public interface CassandraFig extends GuicyFig {
     @Default( "false" )
     boolean isEmbedded();
 
-
-    @Default("CL_QUORUM")
+    @Default("CL_ONE")
     @Key(READ_CL)
     String getReadCL();
 
+    @Default("CL_LOCAL_ONE")
+    @Key(READ_CONSISTENT_CL)
+    String getConsistentReadCL();
+
     @Default("CL_QUORUM")
     @Key(WRITE_CL)
     String getWriteCL();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index 547c504..0b0cbab 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -79,6 +79,11 @@ public class ColumnNameIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public ConsistencyLevel getConsistentReadCL() {
+                return ConsistencyLevel.CL_LOCAL_ONE;
+            }
+
 
             @Override
             public ConsistencyLevel getWriteCL() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index a96db39..964c04a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -83,6 +83,11 @@ public class MultiKeyColumnNameIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public ConsistencyLevel getConsistentReadCL() {
+                return ConsistencyLevel.CL_LOCAL_ONE;
+            }
+
 
             @Override
             public ConsistencyLevel getWriteCL() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 4530d4b..11d34a4 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -86,6 +86,11 @@ public class MultiRowColumnIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public ConsistencyLevel getConsistentReadCL() {
+                return ConsistencyLevel.CL_LOCAL_ONE;
+            }
+
 
             @Override
             public ConsistencyLevel getWriteCL() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fa50734..9343d39 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -113,6 +113,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     private final Timer searchTimer;
     private final Timer allVersionsTimerFuture;
     private final Timer deletePreviousTimerFuture;
+    private final Meter errorMeter;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -174,31 +175,33 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         this.aliasCache = indexCache;
         this.metricsFactory = metricsFactory;
         this.addTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.index.timer" );
+            .getTimer( EsEntityIndexImpl.class, "add.timer" );
         this.removeAliasTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.remove.index.alias.timer" );
+            .getTimer( EsEntityIndexImpl.class, "remove.alias.timer" );
         this.addReadAliasTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.read.alias.timer" );
+            .getTimer( EsEntityIndexImpl.class, "add.read.alias.timer" );
         this.addWriteAliasTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.write.alias.timer" );
+            .getTimer( EsEntityIndexImpl.class, "add.write.alias.timer" );
         this.mappingTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.create.mapping.timer" );
+            .getTimer( EsEntityIndexImpl.class, "create.mapping.timer" );
         this.refreshTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.refresh.timer" );
+            .getTimer( EsEntityIndexImpl.class, "refresh.timer" );
         this.searchTimer =metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.search.timer" );
+            .getTimer( EsEntityIndexImpl.class, "search.timer" );
         this.cursorTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.search.cursor.timer" );
+            .getTimer( EsEntityIndexImpl.class, "search.cursor.timer" );
         this.getVersionsTimer =metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.get.versions.timer" );
+            .getTimer( EsEntityIndexImpl.class, "get.versions.timer" );
         this.allVersionsTimer =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer" );
+            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer" );
         this.deletePreviousTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer" );
+            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer" );
         this.allVersionsTimerFuture =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer.future" );
+            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer.future" );
         this.deletePreviousTimerFuture = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer.future" );
+            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer.future" );
+
+        this.errorMeter = metricsFactory.getMeter(EsEntityIndexImpl.class,"errors");
 
         final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e6e36dd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 935fd16..e20f87c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -63,7 +63,7 @@ public class QueueManagerTest {
     @Before
     public void mockApp() {
         this.scope = new QueueScopeImpl( new SimpleId( "application" ), "testQueue" );
-        qm = qmf.getQueueManager(scope);
+//        qm = qmf.getQueueManager(scope);
         queueScopeFactory = new QueueScopeFactoryImpl(queueFig);
     }
 


[02/11] incubator-usergrid git commit: Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466

Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fde59b70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fde59b70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fde59b70

Branch: refs/heads/USERGRID-466
Commit: fde59b709ca242836dccd78e9af6d711a155a130
Parents: 9e6e36d 19c6ad0
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 12 17:29:59 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 12 17:29:59 2015 -0600

----------------------------------------------------------------------
 .../src/main/groovy/configure_usergrid.groovy   |   8 +-
 stack/awscluster/ugcluster-cf.json              | 150 ++-----------------
 .../usergrid/persistence/index/IndexFig.java    |   9 ++
 .../index/impl/EsIndexBufferConsumerImpl.java   |  55 +++++--
 .../index/guice/TestIndexModule.java            |   4 +-
 5 files changed, 75 insertions(+), 151 deletions(-)
----------------------------------------------------------------------



[10/11] incubator-usergrid git commit: Moved task generation into single factory to clean up code

Posted by to...@apache.org.
Moved task generation into single factory to clean up code

Fixes repair by version

Fixes incorrect task invocation during writes

Fixes swallowed exception bug

Adds on error processing to QueueConsumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f20dfd3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f20dfd3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f20dfd3c

Branch: refs/heads/USERGRID-466
Commit: f20dfd3cbee1f3abb09ae04036b9e747d2485397
Parents: 205e0e0
Author: Todd Nine <tn...@apigee.com>
Authored: Sun Mar 15 13:57:24 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun Mar 15 13:57:24 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  2 +
 .../events/EntityDeletedHandler.java            | 58 ++++++++++-------
 .../events/EntityVersionCreatedHandler.java     | 58 +++++++++++------
 .../events/EntityVersionDeletedHandler.java     | 54 +++++++++-------
 .../corepersistence/StaleIndexCleanupTest.java  | 31 +++++++--
 .../PerformanceEntityRebuildIndexTest.java      |  2 +
 .../collection/guice/CollectionModule.java      | 22 ++-----
 .../EntityCollectionManagerFactoryImpl.java     | 54 +++++++---------
 .../impl/EntityCollectionManagerImpl.java       | 40 +++++-------
 .../collection/impl/EntityDeletedTask.java      | 21 +++---
 .../impl/EntityVersionCleanupTask.java          | 41 ++++++------
 .../impl/EntityVersionCleanupTaskTest.java      | 68 +++++++++++---------
 .../index/impl/EsEntityIndexBatchImpl.java      |  2 +-
 .../index/impl/EsEntityIndexImpl.java           | 13 ----
 .../index/impl/EsIndexBufferConsumerImpl.java   |  2 +-
 15 files changed, 252 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d808ae5..e2b7bd1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -620,6 +620,8 @@ public class CpEntityManager implements EntityManager {
                 WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
                 handleWriteUniqueVerifyException( entity, wuve );
             }
+
+            throw hre;
         }
 
         // update in all containing collections and connection indexes

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 33cc988..94ef5e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -17,52 +17,66 @@
  */
 package org.apache.usergrid.corepersistence.events;
 
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 
 
 /**
  * Delete all Query Index indexes associated with an Entity that has just been deleted.
  */
+@Singleton
 public class EntityDeletedHandler implements EntityDeleted {
-    private static final Logger logger = LoggerFactory.getLogger(EntityDeletedHandler.class );
+    private static final Logger logger = LoggerFactory.getLogger( EntityDeletedHandler.class );
+
+
+    private final EntityManagerFactory emf;
+
 
     @Inject
-    EntityManagerFactory emf;
+    public EntityDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
     @Override
-    public void deleted(CollectionScope scope, Id entityId, UUID version) {
+    public void deleted( CollectionScope scope, Id entityId, UUID version ) {
 
         // This check is for testing purposes and for a test that to be able to dynamically turn
         // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) {
             return;
         }
 
-        logger.debug("Handling deleted event for entity {}:{} v {} "
-                + "scope\n   name: {}\n   owner: {}\n   app: {}",
+        logger.debug( "Handling deleted event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
             new Object[] {
-                entityId.getType(),
-                entityId.getUuid(),
-                version,
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
+                entityId.getType(), entityId.getUuid(), version, scope.getName(), scope.getOwner(),
+                scope.getApplication()
+            } );
+
+        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+        final IndexScope indexScope =
+            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+                scope.getName() );
 
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
 
-//        ei.deleteAllVersionsOfEntity( entityId );
+        ei.createBatch().deindex( indexScope, entityId, version ).execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index 590df61..6270501 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.corepersistence.events;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +27,7 @@ import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -35,34 +38,51 @@ import org.apache.usergrid.persistence.model.entity.Entity;
  * updated by the Collections module and we react by calling the Query Index module and removing
  * any indexes that exist for previous versions of the the Entity.
  */
+@Singleton
 public class EntityVersionCreatedHandler implements EntityVersionCreated {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionCreatedHandler.class );
 
+
+    private final EntityManagerFactory emf;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
     @Inject
-    EntityManagerFactory emf;
+    public EntityVersionCreatedHandler( final EntityManagerFactory emf,
+                                        final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.emf = emf;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
 
 
     @Override
     public void versionCreated( final CollectionScope scope, final Entity entity ) {
+        //not op, we're not migrating properly to this.  Make this an event
 
-        // This check is for testing purposes and for a test that to be able to dynamically turn
-        // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
-            return;
-        }
-
-        logger.debug("Handling versionCreated for entity {}:{} v {} "
-            + "scope\n   name: {}\n   owner: {}\n   app: {}",
-            new Object[] {
-                entity.getId().getType(),
-                entity.getId().getUuid(),
-                entity.getVersion(),
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
-
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+//        // This check is for testing purposes and for a test that to be able to dynamically turn
+//        // off and on delete previous versions so that it can test clean-up on read.
+//        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+//            return;
+//        }
+//
+//        logger.debug("Handling versionCreated for entity {}:{} v {} "
+//            + "scope\n   name: {}\n   owner: {}\n   app: {}",
+//            new Object[] {
+//                entity.getId().getType(),
+//                entity.getId().getUuid(),
+//                entity.getVersion(),
+//                scope.getName(),
+//                scope.getOwner(),
+//                scope.getApplication()});
+//
+//        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+//        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+//
+//
+//
+//
+//
+//
 
 //        ei.deletePreviousVersions( entity.getId(), entity.getVersion() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 127ae46..01848cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.corepersistence.events;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import java.util.List;
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
@@ -34,6 +36,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -43,14 +49,17 @@ import rx.schedulers.Schedulers;
  * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this?
  * If we do need it then it should be wired in via GuiceModule in the corepersistence package.
  */
+@Singleton
 public class EntityVersionDeletedHandler implements EntityVersionDeleted {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
 
-    @Inject
-    private SerializationFig serializationFig;
+
+
+
+    private final EntityManagerFactory emf;
 
     @Inject
-    private EntityManagerFactory emf;
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
     @Override
@@ -63,40 +72,35 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
             return;
         }
 
-        logger.debug("Handling versionDeleted count={} event for entity {}:{} v {} "
-                + "scope\n   name: {}\n   owner: {}\n   app: {}",
-            new Object[] {
-                entityVersions.size(),
-                entityId.getType(),
-                entityId.getUuid(),
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
+                new Object[] {
+                    entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(),
+                    scope.getApplication()
+                } );
+        }
 
         CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
 
         final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
 
-        final EntityIndexBatch eibatch = ei.createBatch();
-
         final IndexScope indexScope = new IndexScopeImpl(
                 new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
                 scope.getName()
         );
 
-        rx.Observable.from(entityVersions)
-            .subscribeOn(Schedulers.io())
-            .buffer(serializationFig.getBufferSize())
-            .map(new Func1<List<MvccEntity>, List<MvccEntity>>() {
+        Observable.from( entityVersions )
+            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
                 @Override
-                public List<MvccEntity> call(List<MvccEntity> entityList) {
-                    for (MvccEntity entity : entityList) {
-                        eibatch.deindex(indexScope, entityId, entity.getVersion());
-                    }
-                    eibatch.execute();
-                    return entityList;
+                public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
+                    entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
                 }
-            }).toBlocking().last();
+            } ).doOnNext( new Action1<EntityIndexBatch>() {
+            @Override
+            public void call( final EntityIndexBatch entityIndexBatch ) {
+                entityIndexBatch.execute();
+            }
+        } ).toBlocking().last();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 8d31b69..ac04dcc 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -246,7 +246,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
      * Test that the EntityDeleteImpl cleans up stale indexes on delete. Ensures that when an
      * entity is deleted its old indexes are cleared from ElasticSearch.
      */
-    @Test(timeout=30000)
+//    @Test(timeout=30000)
+    @Test
     public void testCleanupOnDelete() throws Exception {
 
         logger.info("Started testStaleIndexCleanup()");
@@ -316,11 +317,19 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         // wait for indexes to be cleared for the deleted entities
         count = 0;
+
+
+        //we can't use our candidate result sets here.  The repair won't happen since we now have orphaned documents in our index
+        //us the EM so the repair process happens
+
+        Results results = null;
         do {
-            Thread.sleep(100);
+            //trigger the repair
+            results = queryCollectionEm("things", "select *");
             crs = queryCollectionCp("things", "thing", "select *");
-            em.refreshIndex();
-        } while ( crs.size() > 0 && count++ < 15 );
+            Thread.sleep(100);
+
+        } while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 );
 
         Assert.assertEquals( "Expect no candidates", 0, crs.size() );
     }
@@ -436,4 +445,18 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         return ei.search( is, SearchTypes.fromTypes( type ), rcq );
     }
+
+    /**
+        * Go around EntityManager and execute query directly against Core Persistence.
+        * Results may include stale index entries.
+        */
+       private Results queryCollectionEm( final String collName,  final String query ) throws Exception {
+
+           EntityManager em = app.getEntityManager();
+
+
+           final Results results = em.searchCollection( em.getApplicationRef(), collName, Query.fromQL( query ).withLimit( 10000 ) );
+
+           return results;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index def9ed5..f1f165d 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -362,6 +362,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
             registry.remove( meterName );
             logger.info("Rebuilt index");
 
+            setup.getEmf().refreshIndex();
+
         } catch (Exception ex) {
             logger.error("Error rebuilding index", ex);
             fail();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 7cd383a..a73d7a7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,30 +18,24 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
-
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -52,8 +46,6 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.model.entity.Entity;
 
 
 /**
@@ -72,9 +64,7 @@ public class CollectionModule extends AbstractModule {
         install( new SerializationModule() );
         install( new ServiceModule() );
 
-        install ( new FactoryModuleBuilder().build( EntityVersionCleanupFactory.class ));
-        install ( new FactoryModuleBuilder().build( EntityDeletedFactory.class));
-        install ( new FactoryModuleBuilder().build( EntityVersionCreatedFactory.class ));
+        install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
 
         // users of this module can add their own implemementations
         // for more information: https://github.com/google/guice/wiki/Multibindings

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index bb1b56a..23e375d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -19,23 +19,6 @@
 
 package org.apache.usergrid.persistence.collection.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import java.util.concurrent.ExecutionException;
 
@@ -43,17 +26,32 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
 
 
 
@@ -76,9 +74,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-    private final EntityDeletedFactory entityDeletedFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
@@ -92,8 +88,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                             final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeUpdate, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace, entityVersionCleanupFactory,
-                                entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope, metricsFactory );
+                                mvccLogEntrySerializationStrategy, keyspace, entityVersionTaskFactory,
+                                taskExecutor, scope, metricsFactory );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@ -115,9 +111,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace,
-                                               final EntityVersionCleanupFactory entityVersionCleanupFactory,
-                                               final EntityVersionCreatedFactory entityVersionCreatedFactory,
-                                               final EntityDeletedFactory entityDeletedFactory,
+                                               final EntityVersionTaskFactory entityVersionTaskFactory,
                                                @CollectionTaskExecutor final TaskExecutor taskExecutor,
                                               final EntityCacheFig entityCacheFig,
                                                MetricsFactory metricsFactory) {
@@ -134,9 +128,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-        this.entityDeletedFactory = entityDeletedFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 50b581a..92b07f0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,9 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +31,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
@@ -47,11 +45,13 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -59,6 +59,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -68,12 +70,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import rx.Notification;
 import rx.Observable;
@@ -112,9 +108,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
 
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-    private final EntityDeletedFactory entityDeletedFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
 
     private final Keyspace keyspace;
@@ -144,9 +138,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
         final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
         final Keyspace                             keyspace,
-        final EntityVersionCleanupFactory          entityVersionCleanupFactory,
-        final EntityVersionCreatedFactory          entityVersionCreatedFactory,
-        final EntityDeletedFactory                 entityDeletedFactory,
+        final EntityVersionTaskFactory entityVersionTaskFactory,
         @CollectionTaskExecutor final TaskExecutor taskExecutor,
         @Assisted final CollectionScope            collectionScope,
         final MetricsFactory metricsFactory
@@ -170,9 +162,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.keyspace = keyspace;
 
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-        this.entityDeletedFactory = entityDeletedFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.taskExecutor = taskExecutor;
 
         this.collectionScope = collectionScope;
@@ -216,8 +206,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
             @Override
             public void call(final Entity entity) {
                 //TODO fire the created task first then the entityVersioncleanup
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
-                taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
+                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
+                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entityId,
+                    entity.getVersion(), false ));
                 //post-processing to come later. leave it empty for now.
             }
         }).doOnError(rollback)
@@ -252,8 +243,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                      @Override
                      public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
                          MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                         Task<Void> task = entityDeletedFactory
-                             .getTask(collectionScope, entity.getId(), entity.getVersion());
+                         Task<Void> task = entityVersionTaskFactory
+                             .getDeleteTask( collectionScope, entity.getId(), entity.getVersion() );
                          taskExecutor.submit(task);
                          return entity.getId();
                      }
@@ -400,8 +391,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 logger.debug("sending entity to the queue");
 
                 //we an update, signal the fix
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope, entity));
+                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
 
+                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entity.getId(), entity.getVersion(), false) );
                 //TODO T.N Change this to fire a task
                 //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
                 // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 9ff4f56..83f165d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -22,7 +22,6 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
@@ -46,7 +45,7 @@ import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 public class EntityDeletedTask implements Task<Void> {
     private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
 
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final Set<EntityDeleted> listeners;
@@ -56,16 +55,16 @@ public class EntityDeletedTask implements Task<Void> {
 
 
     @Inject
-    public EntityDeletedTask( 
-        EntityVersionCleanupFactory             entityVersionCleanupFactory,
+    public EntityDeletedTask(
+        EntityVersionTaskFactory entityVersionTaskFactory,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
         final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final CollectionScope         collectionScope, 
-        @Assisted final Id                      entityId, 
+        @Assisted final CollectionScope         collectionScope,
+        @Assisted final Id                      entityId,
         @Assisted final UUID                    version) {
 
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.listeners = listeners;
@@ -81,7 +80,7 @@ public class EntityDeletedTask implements Task<Void> {
                 new Object[] { collectionScope, entityId, version }, throwable );
     }
 
-    
+
     @Override
     public Void rejected() {
         try {
@@ -94,11 +93,11 @@ public class EntityDeletedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
-    public Void call() throws Exception { 
+    public Void call() throws Exception {
 
-        entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
+        entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
 
         fireEvents();
         final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index efecdeb..03a5ff6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -55,7 +55,7 @@ import rx.schedulers.Schedulers;
 
 
 /**
- * Cleans up previous versions from the specified version. Note that this means the version 
+ * Cleans up previous versions from the specified version. Note that this means the version
  * passed in the io event is retained, the range is exclusive.
  */
 public class EntityVersionCleanupTask implements Task<Void> {
@@ -74,10 +74,11 @@ public class EntityVersionCleanupTask implements Task<Void> {
     private final CollectionScope scope;
     private final Id entityId;
     private final UUID version;
+    private final int numToSkip;
 
 
     @Inject
-    public EntityVersionCleanupTask( 
+    public EntityVersionCleanupTask(
         final SerializationFig serializationFig,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
@@ -86,7 +87,8 @@ public class EntityVersionCleanupTask implements Task<Void> {
         final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
         @Assisted final CollectionScope         scope,
         @Assisted final Id                      entityId,
-        @Assisted final UUID                    version ) {
+        @Assisted final UUID                    version,
+        @Assisted final boolean includeVersion) {
 
         this.serializationFig = serializationFig;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
@@ -97,6 +99,8 @@ public class EntityVersionCleanupTask implements Task<Void> {
         this.scope = scope;
         this.entityId = entityId;
         this.version = version;
+
+        numToSkip = includeVersion? 0: 1;
     }
 
 
@@ -136,7 +140,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                 }
             })
             //buffer them for efficiency
-            .skip(1)
+            .skip(numToSkip)
             .buffer(serializationFig.getBufferSize()).doOnNext(
             new Action1<List<MvccEntity>>() {
                 @Override
@@ -146,27 +150,28 @@ public class EntityVersionCleanupTask implements Task<Void> {
                     final MutationBatch logBatch = keyspace.prepareMutationBatch();
 
                     for (MvccEntity mvccEntity : mvccEntities) {
-                        if (!mvccEntity.getEntity().isPresent()) {
-                            continue;
-                        }
-
                         final UUID entityVersion = mvccEntity.getVersion();
-                        final Entity entity = mvccEntity.getEntity().get();
 
-                        //remove all unique fields from the index
-                        for (final Field field : entity.getFields()) {
-                            if (!field.isUnique()) {
-                                continue;
+
+                        //if the entity is present process the fields
+                        if(mvccEntity.getEntity().isPresent()) {
+                            final Entity entity = mvccEntity.getEntity().get();
+
+                            //remove all unique fields from the index
+                            for ( final Field field : entity.getFields() ) {
+                                if ( !field.isUnique() ) {
+                                    continue;
+                                }
+                                final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
+                                final MutationBatch deleteMutation =
+                                    uniqueValueSerializationStrategy.delete( scope, unique );
+                                batch.mergeShallow( deleteMutation );
                             }
-                            final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
-                            final MutationBatch deleteMutation = 
-                                    uniqueValueSerializationStrategy.delete(scope,unique);
-                            batch.mergeShallow(deleteMutation);
                         }
 
                         final MutationBatch entityDelete = entitySerializationStrategy
                                 .delete(scope, entityId, mvccEntity.getVersion());
-                        entityBatch.mergeShallow(entityDelete);
+                        entityBatch.mergeShallow( entityDelete );
                         final MutationBatch logDelete = logEntrySerializationStrategy
                                 .delete(scope, entityId, version);
                         logBatch.mergeShallow(logDelete);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index d0a87c3..f2fb95b 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -79,7 +79,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void noListenerOneVersion() 
+    public void noListenerOneVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -109,7 +109,7 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
@@ -133,7 +133,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
@@ -148,10 +149,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory(
@@ -175,7 +176,7 @@ public class EntityVersionCleanupTaskTest {
      * Tests the cleanup task on the first version created
      */
     @Test(timeout=10000)
-    public void noListenerNoVersions() 
+    public void noListenerNoVersions()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -208,14 +209,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
 
 
@@ -233,7 +234,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -249,10 +251,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -267,7 +269,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         // These last two verify statements do not make sense. We cannot assert that the entity
-        // and log batches are never called. Even if there are no listeners the entity delete 
+        // and log batches are never called. Even if there are no listeners the entity delete
         // cleanup task will still run to do the normal cleanup.
         //
         // verify( entityBatch, never() ).execute();
@@ -276,7 +278,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void singleListenerSingleVersion() 
+    public void singleListenerSingleVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -317,14 +319,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -343,7 +345,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -361,10 +364,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -421,7 +424,7 @@ public class EntityVersionCleanupTaskTest {
         final int sizeToReturn = 10;
 
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * 3 );
 
         final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
@@ -436,13 +439,13 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
         // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
         final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
@@ -456,7 +459,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -474,10 +478,10 @@ public class EntityVersionCleanupTaskTest {
 
         Entity entity = new Entity( entityId );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -543,7 +547,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 5;
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
@@ -565,14 +569,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -591,7 +595,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                    false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -714,7 +719,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -768,7 +774,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
             invocationLatch.countDown();
         }
@@ -786,7 +792,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
 
             //wait for unblock to happen before counting down invocation latches

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 16b3ff9..92312d2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -101,6 +101,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
         IndexValidationUtils.validateIndexScope( indexScope );
         ValidationUtils.verifyEntityWrite( entity );
+        ValidationUtils.verifyVersion( entity.getVersion() );
 
         final String context = createContextName(indexScope);
 
@@ -161,7 +162,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         }
 
 
-        log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
         String[] indexes = entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read);
         //get the default index if no alias exists yet
         if(indexes == null ||indexes.length == 0){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3a11ec7..16f4b3f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -113,9 +113,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     private final Timer addWriteAliasTimer;
     private final Timer addReadAliasTimer;
     private final Timer searchTimer;
-    private final Timer allVersionsTimerFuture;
-    private final Timer deletePreviousTimerFuture;
-    private final Meter errorMeter;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -190,16 +187,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             .getTimer( EsEntityIndexImpl.class, "search.cursor.timer" );
         this.getVersionsTimer =metricsFactory
             .getTimer( EsEntityIndexImpl.class, "get.versions.timer" );
-        this.allVersionsTimer =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer" );
-        this.deletePreviousTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer" );
-        this.allVersionsTimerFuture =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer.future" );
-        this.deletePreviousTimerFuture = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer.future" );
-
-        this.errorMeter = metricsFactory.getMeter(EsEntityIndexImpl.class,"errors");
 
 
         final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 5259a26..d55073a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -273,7 +273,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             public void call( final BulkRequestBuilder bulkRequestBuilder ) {
                 sendRequest( bulkRequestBuilder );
             }
-        } ).toBlocking().last();
+        } ).toBlocking().lastOrDefault(null);
 
         //call back all futures
         Observable.from(operationMessages)


[07/11] incubator-usergrid git commit: Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466

Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466

Conflicts:
	stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9d00c223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9d00c223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9d00c223

Branch: refs/heads/USERGRID-466
Commit: 9d00c2235111954abbbcf9e26afacca74df1c1bd
Parents: a1b557d 361060e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:23:53 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:23:53 2015 -0600

----------------------------------------------------------------------
 .../main/groovy/configure_elasticsearch.groovy  |   1 -
 .../persistence/index/guice/QueueProvider.java  | 116 +++++++++++++++++++
 .../usergrid/persistence/queue/QueueScope.java  |   2 +-
 .../persistence/queue/QueueScopeFactory.java    |  34 ------
 .../persistence/queue/guice/QueueModule.java    |  17 ++-
 .../queue/impl/QueueScopeFactoryImpl.java       |  48 --------
 .../persistence/queue/impl/QueueScopeImpl.java  |  27 +----
 .../queue/impl/SQSQueueManagerImpl.java         |   6 +-
 .../persistence/queue/NoAWSCredsRule.java       |  81 +++++++++++++
 .../persistence/queue/QueueManagerTest.java     |  31 ++---
 .../services/notifications/QueueListener.java   |   5 +-
 .../usergrid/services/queues/QueueListener.java |   5 +-
 12 files changed, 232 insertions(+), 141 deletions(-)
----------------------------------------------------------------------



[04/11] incubator-usergrid git commit: add replay strategy

Posted by to...@apache.org.
add replay strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/44148384
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/44148384
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/44148384

Branch: refs/heads/USERGRID-466
Commit: 441483842b14c70bc075437a32cbef52f9460a3c
Parents: a1b557d
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 11:29:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 11:29:27 2015 -0600

----------------------------------------------------------------------
 .../persistence/core/astyanax/ColumnNameIteratorTest.java        | 4 ++--
 .../core/astyanax/MultiKeyColumnNameIteratorTest.java            | 4 ++--
 .../persistence/core/astyanax/MultiRowColumnIteratorTest.java    | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44148384/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index 0b0cbab..d2c6f89 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -76,12 +76,12 @@ public class ColumnNameIteratorTest {
         final CassandraConfig cassandraConfig = new CassandraConfig() {
             @Override
             public ConsistencyLevel getReadCL() {
-                return ConsistencyLevel.CL_QUORUM;
+                return ConsistencyLevel.CL_LOCAL_ONE;
             }
 
             @Override
             public ConsistencyLevel getConsistentReadCL() {
-                return ConsistencyLevel.CL_LOCAL_ONE;
+                return ConsistencyLevel.CL_LOCAL_QUORUM;
             }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44148384/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index 964c04a..f4f6f9c 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -80,12 +80,12 @@ public class MultiKeyColumnNameIteratorTest {
         final CassandraConfig cassandraConfig = new CassandraConfig() {
             @Override
             public ConsistencyLevel getReadCL() {
-                return ConsistencyLevel.CL_QUORUM;
+                return ConsistencyLevel.CL_LOCAL_ONE;
             }
 
             @Override
             public ConsistencyLevel getConsistentReadCL() {
-                return ConsistencyLevel.CL_LOCAL_ONE;
+                return ConsistencyLevel.CL_LOCAL_QUORUM;
             }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44148384/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 11d34a4..c32b820 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -83,12 +83,12 @@ public class MultiRowColumnIteratorTest {
         final CassandraConfig cassandraConfig = new CassandraConfig() {
             @Override
             public ConsistencyLevel getReadCL() {
-                return ConsistencyLevel.CL_QUORUM;
+                return ConsistencyLevel.CL_LOCAL_ONE;
             }
 
             @Override
             public ConsistencyLevel getConsistentReadCL() {
-                return ConsistencyLevel.CL_LOCAL_ONE;
+                return ConsistencyLevel.CL_LOCAL_QUORUM;
             }
 
 


[05/11] incubator-usergrid git commit: add replay strategy

Posted by to...@apache.org.
add replay strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/117c0f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/117c0f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/117c0f3d

Branch: refs/heads/USERGRID-466
Commit: 117c0f3d8839cb1da2cae35b9a1171c1b404ac98
Parents: 4414838
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:10:55 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:10:55 2015 -0600

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteUniqueVerify.java     | 94 +++++++++-----------
 1 file changed, 42 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117c0f3d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 5f96b92..173efa7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -101,9 +101,46 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final CollectionScope scope = ioevent.getEntityCollection();
 
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+        //allocate our max size, worst case
+        final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
+
+        //
+        // Construct all the functions for verifying we're unique
+        //
+        for ( final Field field :  entity.getFields() ) {
+
+            // if it's unique, create a function to validate it and add it to the list of
+            // concurrent validations
+            if ( field.isUnique() ) {
+                // use write-first then read strategy
+                final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
+
+                // use TTL in case something goes wrong before entity is finally committed
+                final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+
+                batch.mergeShallow( mb );
+                uniqueFields.add(field);
+            }
+        }
+
+        //short circuit nothing to do
+        if ( uniqueFields.size() == 0 ) {
+            return  ;
+        }
+
+        //perform the write
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException ex ) {
+            throw new RuntimeException( "Unable to write to cassandra", ex );
+        }
+
         // use simple thread pool to verify fields in parallel
-        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
+        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, uniqueFields,entity);
         Map<String,Field>  uniquenessViolations = cmd.execute();
+         cmd.getFailedExecutionException();
         //We have violations, throw an exception
         if ( !uniquenessViolations.isEmpty() ) {
             throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
@@ -113,19 +150,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
 
         private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-        private final Keyspace keySpace;
-        private final SerializationFig serializationFig;
         private final CassandraConfig fig;
         private final CollectionScope scope;
+        private final List<Field> uniqueFields;
         private final Entity entity;
 
-        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity entity){
+        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy, CassandraConfig fig, CollectionScope scope, List<Field> uniqueFields, Entity entity){
             super(REPLAY_GROUP);
             this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
-            this.keySpace = keySpace;
-            this.serializationFig = serializationFig;
             this.fig = fig;
             this.scope = scope;
+            this.uniqueFields = uniqueFields;
             this.entity = entity;
         }
 
@@ -142,23 +177,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
             Collection<Field> entityFields = entity.getFields();
             //allocate our max size, worst case
-            final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size() );
             //now get the set of fields back
             final UniqueValueSet uniqueValues;
-            //todo add consistencylevel and read back if fail using
-
             try {
-
-                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
+                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, entityFields );
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to read from cassandra", e );
             }
 
-
             final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
 
-
             //loop through each field that was unique
             for ( final Field field : uniqueFields ) {
 
@@ -170,51 +199,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
                             field.getName() ) );
                 }
 
-
                 final Id returnedEntityId = uniqueValue.getEntityId();
 
-
                 if ( !entity.getId().equals(returnedEntityId) ) {
                     uniquenessViolations.put( field.getName(), field );
                 }
             }
-            final MutationBatch batch = keySpace.prepareMutationBatch();
-            //
-            // Construct all the functions for verifying we're unique
-            //
-            for ( final Field field :  entity.getFields() ) {
-
-                // if it's unique, create a function to validate it and add it to the list of
-                // concurrent validations
-                if ( field.isUnique() ) {
-
-
-                    // use write-first then read strategy
-                    final UniqueValue written = new UniqueValueImpl( field, entity.getId(), entity.getVersion() );
-
-                    // use TTL in case something goes wrong before entity is finally committed
-                    final MutationBatch mb = uniqueValueSerializationStrategy.write( scope, written, serializationFig.getTimeout() );
-
-                    batch.mergeShallow( mb );
-
-
-                    uniqueFields.add(field);
-                }
-            }
-
-            //short circuit nothing to do
-            if ( uniqueFields.size() == 0 ) {
-                return uniquenessViolations ;
-            }
-
-
-            //perform the write
-            try {
-                batch.execute();
-            }
-            catch ( ConnectionException ex ) {
-                throw new RuntimeException( "Unable to write to cassandra", ex );
-            }
 
             return uniquenessViolations;
         }


[09/11] incubator-usergrid git commit: Merge remote-tracking branch 'origin/USERGRID-466-change-write-consistency' into USERGRID-466

Posted by to...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-466-change-write-consistency' into USERGRID-466

Conflicts:
	stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/205e0e0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/205e0e0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/205e0e0c

Branch: refs/heads/USERGRID-466
Commit: 205e0e0c03c176623fe9fa2cc18e6a32878838b5
Parents: 2d6ae36 45271d7
Author: Todd Nine <tn...@apigee.com>
Authored: Sun Mar 15 13:56:33 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun Mar 15 13:56:33 2015 -0600

----------------------------------------------------------------------
 .../collection/EntityDeletedFactory.java        |  34 -----
 .../collection/EntityVersionCleanupFactory.java |  35 -----
 .../collection/EntityVersionCreatedFactory.java |  31 -----
 .../impl/EntityVersionTaskFactory.java          |  65 ++++++++++
 .../mvcc/stage/write/WriteUniqueVerify.java     | 130 ++++++++++++-------
 .../UniqueValueSerializationStrategy.java       |  21 ++-
 .../UniqueValueSerializationStrategyImpl.java   |  19 ++-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../core/astyanax/CassandraConfig.java          |   6 +
 .../core/astyanax/CassandraConfigImpl.java      |   8 +-
 .../persistence/core/astyanax/CassandraFig.java |   9 +-
 .../core/astyanax/ColumnNameIteratorTest.java   |   7 +-
 .../MultiKeyColumnNameIteratorTest.java         |   7 +-
 .../astyanax/MultiRowColumnIteratorTest.java    |   7 +-
 .../index/impl/EsEntityIndexImpl.java           |  31 +++--
 15 files changed, 238 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/205e0e0c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityDeletedFactory.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityDeletedFactory.java
index 3c673ee,3c673ee..0000000
deleted file mode 100644,100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityDeletedFactory.java
+++ /dev/null
@@@ -1,34 -1,34 +1,0 @@@
--/*
-- *
-- *  * Licensed to the Apache Software Foundation (ASF) under one or more
-- *  *  contributor license agreements.  The ASF licenses this file to You
-- *  * under the Apache License, Version 2.0 (the "License"); you may not
-- *  * use this file except in compliance with the License.
-- *  * You may obtain a copy of the License at
-- *  *
-- *  *     http://www.apache.org/licenses/LICENSE-2.0
-- *  *
-- *  * Unless required by applicable law or agreed to in writing, software
-- *  * distributed under the License is distributed on an "AS IS" BASIS,
-- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- *  * See the License for the specific language governing permissions and
-- *  * limitations under the License.  For additional information regarding
-- *  * copyright in this work, please see the NOTICE file in the top level
-- *  * directory of this distribution.
-- *
-- */
--
--package org.apache.usergrid.persistence.collection;
--
--import org.apache.usergrid.persistence.collection.impl.EntityDeletedTask;
--import org.apache.usergrid.persistence.model.entity.Id;
--
--import java.util.UUID;
--
--/**
-- * Creates EntityDeletedTask instances
-- */
--public interface EntityDeletedFactory {
--    public EntityDeletedTask getTask(final CollectionScope collectionScope, final Id entityId, final UUID version );
--
--}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/205e0e0c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCleanupFactory.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCleanupFactory.java
index 2232f00,2232f00..0000000
deleted file mode 100644,100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCleanupFactory.java
+++ /dev/null
@@@ -1,35 -1,35 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- *  contributor license agreements.  The ASF licenses this file to You
-- * under the Apache License, Version 2.0 (the "License"); you may not
-- * use this file except in compliance with the License.
-- * You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.  For additional information regarding
-- * copyright in this work, please see the NOTICE file in the top level
-- * directory of this distribution.
-- */
--package org.apache.usergrid.persistence.collection;
--
--
--
--import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
--import org.apache.usergrid.persistence.model.entity.Id;
--
--import java.util.UUID;
--
--
--public interface EntityVersionCleanupFactory {
--
--    public EntityVersionCleanupTask getTask( 
--        final CollectionScope scope, 
--        final Id entityId, 
--        final UUID version );
--
--}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/205e0e0c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
index 4248d42,4248d42..0000000
deleted file mode 100644,100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
+++ /dev/null
@@@ -1,31 -1,31 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *    http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.usergrid.persistence.collection;
--
--
--import java.util.UUID;
--
--import org.apache.usergrid.persistence.collection.impl.EntityVersionCreatedTask;
--import org.apache.usergrid.persistence.model.entity.Entity;
--import org.apache.usergrid.persistence.model.entity.Id;
--
--public interface EntityVersionCreatedFactory {
--    public EntityVersionCreatedTask getTask( final CollectionScope scope, final Entity entity);
--
--}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/205e0e0c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
index 0000000,0000000..b41e668
new file mode 100644
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
@@@ -1,0 -1,0 +1,65 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  contributor license agreements.  The ASF licenses this file to You
++ * under the Apache License, Version 2.0 (the "License"); you may not
++ * use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.  For additional information regarding
++ * copyright in this work, please see the NOTICE file in the top level
++ * directory of this distribution.
++ */
++package org.apache.usergrid.persistence.collection.impl;
++
++
++
++import org.apache.usergrid.persistence.collection.CollectionScope;
++import org.apache.usergrid.persistence.collection.impl.EntityDeletedTask;
++import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
++import org.apache.usergrid.persistence.collection.impl.EntityVersionCreatedTask;
++import org.apache.usergrid.persistence.model.entity.Entity;
++import org.apache.usergrid.persistence.model.entity.Id;
++
++import java.util.UUID;
++
++
++public interface EntityVersionTaskFactory {
++
++    /**
++     * Get a task for cleaning up latent entity data.  If includeVersion = true, the passed version will be cleaned up as well
++     * Otherwise this is a V-1 operation
++     *
++     * @param scope
++     * @param entityId
++     * @param version
++     * @param includeVersion
++     * @return
++     */
++    public EntityVersionCleanupTask getCleanupTask( final CollectionScope scope, final Id entityId, final UUID version,
++                                                    final boolean includeVersion );
++
++    /**
++     * Get an entityVersionCreatedTask
++     * @param scope
++     * @param entity
++     * @return
++     */
++    public EntityVersionCreatedTask getCreatedTask( final CollectionScope scope, final Entity entity );
++
++    /**
++     * Get an entity deleted task
++     * @param collectionScope
++     * @param entityId
++     * @param version
++     * @return
++     */
++    public EntityDeletedTask getDeleteTask( final CollectionScope collectionScope, final Id entityId,
++                                            final UUID version );
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/205e0e0c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8be044f,9343d39..3a11ec7
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@@ -169,26 -173,36 +172,36 @@@ public class EsEntityIndexImpl implemen
          this.alias = indexIdentifier.getAlias();
          this.failureMonitor = new FailureMonitorImpl( config, provider );
          this.aliasCache = indexCache;
 -        this.metricsFactory = metricsFactory;
          this.addTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.index.timer" );
+             .getTimer( EsEntityIndexImpl.class, "add.timer" );
          this.removeAliasTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.remove.index.alias.timer" );
+             .getTimer( EsEntityIndexImpl.class, "remove.alias.timer" );
          this.addReadAliasTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.read.alias.timer" );
+             .getTimer( EsEntityIndexImpl.class, "add.read.alias.timer" );
          this.addWriteAliasTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.write.alias.timer" );
+             .getTimer( EsEntityIndexImpl.class, "add.write.alias.timer" );
          this.mappingTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.create.mapping.timer" );
+             .getTimer( EsEntityIndexImpl.class, "create.mapping.timer" );
          this.refreshTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.refresh.timer" );
+             .getTimer( EsEntityIndexImpl.class, "refresh.timer" );
          this.searchTimer =metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.search.timer" );
+             .getTimer( EsEntityIndexImpl.class, "search.timer" );
          this.cursorTimer = metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.search.cursor.timer" );
+             .getTimer( EsEntityIndexImpl.class, "search.cursor.timer" );
          this.getVersionsTimer =metricsFactory
-             .getTimer( EsEntityIndexImpl.class, "es.entity.index.get.versions.timer" );
+             .getTimer( EsEntityIndexImpl.class, "get.versions.timer" );
+         this.allVersionsTimer =  metricsFactory
+             .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer" );
+         this.deletePreviousTimer = metricsFactory
+             .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer" );
+         this.allVersionsTimerFuture =  metricsFactory
+             .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer.future" );
+         this.deletePreviousTimerFuture = metricsFactory
+             .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer.future" );
+ 
+         this.errorMeter = metricsFactory.getMeter(EsEntityIndexImpl.class,"errors");
  
 +
          final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
  
          mapManager = mapManagerFactory.createMapManager(mapScope);


[11/11] incubator-usergrid git commit: Fixes bug in writeunique verify failover command

Posted by to...@apache.org.
Fixes bug in writeunique verify failover command

Fixes NoAWSCredsRule to use Assumptions to mark as ignored

Adds AWS creds rule to NotificationsServiceTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/03341191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/03341191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/03341191

Branch: refs/heads/USERGRID-466
Commit: 03341191a2109b8d297387c37b88df7af7b50218
Parents: f20dfd3
Author: Todd Nine <tn...@apigee.com>
Authored: Sun Mar 15 15:34:45 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun Mar 15 15:34:45 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/StaleIndexCleanupTest.java  |  3 ++-
 .../impl/EntityVersionCleanupTask.java          |  7 +++--
 .../mvcc/stage/write/WriteUniqueVerify.java     | 24 +++++++++--------
 .../collection/util/EntityUtils.java            | 28 +++++++++++++++++---
 .../index/impl/EsIndexBufferConsumerImpl.java   | 14 ++++++++--
 .../persistence/queue/NoAWSCredsRule.java       | 17 ++++++++++++
 stack/services/pom.xml                          | 13 ++++++++-
 .../notifications/NotifiersServiceIT.java       |  6 +++++
 8 files changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index ac04dcc..4bde50e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -392,9 +392,10 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         // wait for indexes to be cleared for the deleted entities
         count = 0;
         do {
+            queryCollectionEm("dogs", "select *");
             Thread.sleep(100);
             crs = queryCollectionCp("dogs", "dog", "select *");
-        } while ( crs.size() == numEntities && count++ < 15 );
+        } while ( crs.size() != numEntities && count++ < 15 );
 
         Assert.assertEquals("Expect candidates without earlier stale entities", crs.size(), numEntities);
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 03a5ff6..65506ec 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
@@ -158,10 +159,8 @@ public class EntityVersionCleanupTask implements Task<Void> {
                             final Entity entity = mvccEntity.getEntity().get();
 
                             //remove all unique fields from the index
-                            for ( final Field field : entity.getFields() ) {
-                                if ( !field.isUnique() ) {
-                                    continue;
-                                }
+                            for ( final Field field : EntityUtils.getUniqueFields(entity )) {
+
                                 final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
                                 final MutationBatch deleteMutation =
                                     uniqueValueSerializationStrategy.delete( scope, unique );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 173efa7..5bdf3b9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -31,6 +31,8 @@ import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 import com.netflix.hystrix.HystrixThreadPoolProperties;
 import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
+
+import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.slf4j.Logger;
@@ -108,20 +110,21 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         //
         // Construct all the functions for verifying we're unique
         //
-        for ( final Field field :  entity.getFields() ) {
+
+
+        for ( final Field field : EntityUtils.getUniqueFields(entity)) {
 
             // if it's unique, create a function to validate it and add it to the list of
             // concurrent validations
-            if ( field.isUnique() ) {
-                // use write-first then read strategy
-                final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
 
-                // use TTL in case something goes wrong before entity is finally committed
-                final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+            // use write-first then read strategy
+            final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
 
-                batch.mergeShallow( mb );
-                uniqueFields.add(field);
-            }
+            // use TTL in case something goes wrong before entity is finally committed
+            final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+
+            batch.mergeShallow( mb );
+            uniqueFields.add(field);
         }
 
         //short circuit nothing to do
@@ -175,12 +178,11 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         }
 
         public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
-            Collection<Field> entityFields = entity.getFields();
             //allocate our max size, worst case
             //now get the set of fields back
             final UniqueValueSet uniqueValues;
             try {
-                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, entityFields );
+                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to read from cassandra", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
index 6233075..cf964a3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
@@ -1,8 +1,11 @@
 package org.apache.usergrid.persistence.collection.util;
 
 
-import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import org.apache.usergrid.persistence.model.field.Field;
 
 import org.apache.commons.lang3.reflect.FieldUtils;
 
@@ -16,9 +19,9 @@ import org.apache.usergrid.persistence.model.entity.Id;
 public class EntityUtils {
 
 
-    private static final Field VERSION = FieldUtils.getField( Entity.class, "version", true );
+    private static final java.lang.reflect.Field VERSION = FieldUtils.getField( Entity.class, "version", true );
 
-    private static final Field ID = FieldUtils.getField( Entity.class, "id", true );
+    private static final java.lang.reflect.Field ID = FieldUtils.getField( Entity.class, "id", true );
 
 
     /**
@@ -46,4 +49,23 @@ public class EntityUtils {
             throw new RuntimeException( "Unable to set the field " + ID + " into the entity", e );
         }
     }
+
+
+    /**
+     * Get the unique fields for an entity
+     * @param entity
+     * @return
+     */
+    public static List<Field> getUniqueFields( final Entity entity ){
+
+        final List<Field> uniqueFields = new ArrayList<>(entity.getFields().size());
+
+        for(final Field field: entity.getFields()){
+            if(field.isUnique()){
+                uniqueFields.add( field);
+            }
+        }
+
+        return uniqueFields;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d55073a..b31cf39 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -322,8 +322,18 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         for (BulkItemResponse response : responses) {
             if (response.isFailed()) {
-                throw new RuntimeException("Unable to index documents.  Errors are :"
-                    + response.getFailure().getMessage());
+
+                final BulkItemResponse.Failure failure = response.getFailure();
+
+                final String message;
+
+                if(failure != null) {
+                    message =  "Unable to index documents.  Errors are :" + response.getFailure().getMessage();
+                }else{
+                    message =  "Unable to index documents.  Response is :" + response.getResponse();
+                }
+
+                throw new RuntimeException(message);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
index 45218c2..ba0dc1f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.queue;
 
 
+import org.junit.Assume;
+import org.junit.internal.runners.model.MultipleFailureException;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -49,6 +51,9 @@ public class NoAWSCredsRule implements TestRule {
                         throw t;
                     }
 
+                    //do this so our test gets marked as ignored.  Not pretty, but it works
+                    Assume.assumeTrue( false );
+
 
                 }
             }
@@ -69,6 +74,18 @@ public class NoAWSCredsRule implements TestRule {
             }
         }
 
+        /**
+         * Handle the multiple failure junit trace
+         */
+        if( t instanceof MultipleFailureException ){
+            for(final Throwable failure : ((MultipleFailureException)t).getFailures()){
+                final boolean isMissingCreds = isMissingCredsException( failure );
+
+                if(isMissingCreds){
+                    return true;
+                }
+            }
+        }
         final Throwable cause = t.getCause();
 
         if ( cause == null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index f5e8dd5..68f1a4a 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -133,6 +133,7 @@
       <scope>runtime</scope>
     </dependency>
 
+
     <dependency>
       <groupId>org.apache.usergrid</groupId>
       <artifactId>usergrid-core</artifactId>
@@ -290,7 +291,15 @@
       <classifier>tests</classifier>
     </dependency>
 
-    <dependency>
+      <dependency>
+          <groupId>${project.parent.groupId}</groupId>
+          <artifactId>queue</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+      </dependency>
+
+      <dependency>
       <groupId>org.apache.usergrid</groupId>
       <artifactId>usergrid-config</artifactId>
       <version>${project.version}</version>
@@ -311,6 +320,8 @@
       <scope>test</scope>
     </dependency>
 
+
+
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-test</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/03341191/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
index 3c1c645..e6970b5 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
@@ -17,12 +17,15 @@
 package org.apache.usergrid.services.notifications;
 
 import org.apache.commons.io.IOUtils;
+
+import org.apache.usergrid.persistence.queue.NoAWSCredsRule;
 import org.apache.usergrid.services.notifications.apns.MockSuccessfulProviderAdapter;
 import org.apache.usergrid.persistence.entities.Notifier;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.apache.usergrid.services.notifications.ConnectionException;
 import org.apache.usergrid.services.notifications.NotificationsService;
@@ -48,6 +51,9 @@ public class NotifiersServiceIT extends AbstractServiceIT {
 
     private QueueListener listener;
 
+    @Rule
+    public NoAWSCredsRule noCredsRule = new NoAWSCredsRule();
+
 
     @Before
     public void before() throws Exception {


[03/11] incubator-usergrid git commit: add replay strategy

Posted by to...@apache.org.
add replay strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a1b557d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a1b557d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a1b557d0

Branch: refs/heads/USERGRID-466
Commit: a1b557d073fd8dbe4c20f949ce7497db01d7cea5
Parents: fde59b7
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 11:25:52 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 11:25:52 2015 -0600

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteUniqueVerify.java     | 167 ++++++++++++-------
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../persistence/core/astyanax/CassandraFig.java |   4 +-
 3 files changed, 111 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 1c30e75..5f96b92 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -25,7 +25,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.hystrix.Hystrix;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
 import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,12 +73,14 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     protected final SerializationFig serializationFig;
 
     protected final Keyspace keyspace;
+    private final CassandraConfig cassandraFig;
 
 
     @Inject
     public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
-                              final SerializationFig serializationFig, final Keyspace keyspace ) {
+                              final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) {
         this.keyspace = keyspace;
+        this.cassandraFig = cassandraFig;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
         Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -88,100 +97,134 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final MvccEntity mvccEntity = ioevent.getEvent();
 
-        final Id entityId = mvccEntity.getId();
-
-        final UUID entityVersion = mvccEntity.getVersion();
-
-
         final Entity entity = mvccEntity.getEntity().get();
 
-
         final CollectionScope scope = ioevent.getEntityCollection();
 
         // use simple thread pool to verify fields in parallel
+        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
+        Map<String,Field>  uniquenessViolations = cmd.execute();
+        //We have violations, throw an exception
+        if ( !uniquenessViolations.isEmpty() ) {
+            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
+        }
+    }
 
-        final Collection<Field> entityFields = entity.getFields();
+    private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
+
+        private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+        private final Keyspace keySpace;
+        private final SerializationFig serializationFig;
+        private final CassandraConfig fig;
+        private final CollectionScope scope;
+        private final Entity entity;
+
+        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity entity){
+            super(REPLAY_GROUP);
+            this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+            this.keySpace = keySpace;
+            this.serializationFig = serializationFig;
+            this.fig = fig;
+            this.scope = scope;
+            this.entity = entity;
+        }
 
-        //allocate our max size, worst case
-        final List<Field> uniqueFields = new ArrayList<>( entityFields.size() );
+        @Override
+        protected Map<String, Field> run() throws Exception {
+            return executeStrategy(fig.getReadCL());
+        }
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        //
-        // Construct all the functions for verifying we're unique
-        //
-        for ( final Field field : entityFields ) {
+        @Override
+        protected Map<String, Field> getFallback() {
+            return executeStrategy(fig.getConsistentReadCL());
+        }
 
-            // if it's unique, create a function to validate it and add it to the list of
-            // concurrent validations
-            if ( field.isUnique() ) {
+        public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
+            Collection<Field> entityFields = entity.getFields();
+            //allocate our max size, worst case
+            final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size() );
+            //now get the set of fields back
+            final UniqueValueSet uniqueValues;
+            //todo add consistencylevel and read back if fail using
 
+            try {
 
-                // use write-first then read strategy
-                final UniqueValue written = new UniqueValueImpl( field, entityId, entityVersion );
+                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to read from cassandra", e );
+            }
 
-                // use TTL in case something goes wrong before entity is finally committed
-                final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
 
-                batch.mergeShallow( mb );
+            final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
 
 
-                uniqueFields.add( field );
-            }
-        }
+            //loop through each field that was unique
+            for ( final Field field : uniqueFields ) {
 
-        //short circuit nothing to do
-        if ( uniqueFields.size() == 0 ) {
-            return;
-        }
+                final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
 
+                if ( uniqueValue == null ) {
+                    throw new RuntimeException(
+                        String.format( "Could not retrieve unique value for field %s, unable to verify",
+                            field.getName() ) );
+                }
 
-        //perform the write
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException ex ) {
-            throw new RuntimeException( "Unable to write to cassandra", ex );
-        }
 
+                final Id returnedEntityId = uniqueValue.getEntityId();
 
-        //now get the set of fields back
-        final UniqueValueSet uniqueValues;
 
-        try {
-            uniqueValues = uniqueValueStrat.load( scope, uniqueFields );
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to read from cassandra", e );
-        }
+                if ( !entity.getId().equals(returnedEntityId) ) {
+                    uniquenessViolations.put( field.getName(), field );
+                }
+            }
+            final MutationBatch batch = keySpace.prepareMutationBatch();
+            //
+            // Construct all the functions for verifying we're unique
+            //
+            for ( final Field field :  entity.getFields() ) {
 
+                // if it's unique, create a function to validate it and add it to the list of
+                // concurrent validations
+                if ( field.isUnique() ) {
 
-        final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
 
+                    // use write-first then read strategy
+                    final UniqueValue written = new UniqueValueImpl( field, entity.getId(), entity.getVersion() );
 
-        //loop through each field that was unique
-        for ( final Field field : uniqueFields ) {
+                    // use TTL in case something goes wrong before entity is finally committed
+                    final MutationBatch mb = uniqueValueSerializationStrategy.write( scope, written, serializationFig.getTimeout() );
 
-            final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
+                    batch.mergeShallow( mb );
 
-            if ( uniqueValue == null ) {
-                throw new RuntimeException(
-                        String.format( "Could not retrieve unique value for field %s, unable to verify",
-                                field.getName() ) );
-            }
 
+                    uniqueFields.add(field);
+                }
+            }
 
-            final Id returnedEntityId = uniqueValue.getEntityId();
+            //short circuit nothing to do
+            if ( uniqueFields.size() == 0 ) {
+                return uniquenessViolations ;
+            }
 
 
-            if ( !entityId.equals( returnedEntityId ) ) {
-                uniquenessViolations.put( field.getName(), field );
+            //perform the write
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException ex ) {
+                throw new RuntimeException( "Unable to write to cassandra", ex );
             }
-        }
-
 
-        //We have violations, throw an exception
-        if ( !uniquenessViolations.isEmpty() ) {
-            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
+            return uniquenessViolations;
         }
     }
+
+    /**
+     * Command group used for realtime user commands
+     */
+    public static final HystrixCommand.Setter
+        REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
+            HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
+                HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 4ebfd24..51cb198 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.junit.Rule;
 import org.junit.Test;
@@ -64,7 +65,8 @@ public class WriteUniqueVerifyTest {
     @Inject
     private SerializationFig fig;
 
-
+    @Inject
+    private CassandraConfig cassandraConfig;
 
 
     @Test
@@ -81,7 +83,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig );
 
        newStage.call(
             new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 66a716c..a907702 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -79,11 +79,11 @@ public interface CassandraFig extends GuicyFig {
     @Default( "false" )
     boolean isEmbedded();
 
-    @Default("CL_ONE")
+    @Default("CL_LOCAL_ONE")
     @Key(READ_CL)
     String getReadCL();
 
-    @Default("CL_LOCAL_ONE")
+    @Default("CL_LOCAL_QUORUM")
     @Key(READ_CONSISTENT_CL)
     String getConsistentReadCL();
 


[06/11] incubator-usergrid git commit: add comment

Posted by to...@apache.org.
add comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cea08db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cea08db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cea08db

Branch: refs/heads/USERGRID-466
Commit: 8cea08db37eafede2dbe3c7a8cd1fd38dc0d548a
Parents: 117c0f3
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:21:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:21:27 2015 -0600

----------------------------------------------------------------------
 .../collection/serialization/UniqueValueSerializationStrategy.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cea08db/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 8d314d8..60275d9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -64,6 +64,7 @@ public interface UniqueValueSerializationStrategy extends Migration {
      * Load UniqueValue that matches field from collection or null if that value does not exist.
      *
      * @param colScope Collection scope in which to look for field name/value
+     * @param consistencyLevel Consistency level of query
      * @param fields Field name/value to search for
      * @return UniqueValueSet containing fields from the collection that exist in cassandra
      * @throws ConnectionException on error connecting to Cassandra


[08/11] incubator-usergrid git commit: Merge branch 'USERGRID-466' into USERGRID-466-change-write-consistency

Posted by to...@apache.org.
Merge branch 'USERGRID-466' into USERGRID-466-change-write-consistency


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45271d7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45271d7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45271d7a

Branch: refs/heads/USERGRID-466
Commit: 45271d7a26ad61063a13307ad567882afe387acc
Parents: 8cea08d 9d00c22
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 13 12:24:16 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 13 12:24:16 2015 -0600

----------------------------------------------------------------------
 .../main/groovy/configure_elasticsearch.groovy  |   1 -
 .../persistence/index/guice/QueueProvider.java  | 116 +++++++++++++++++++
 .../usergrid/persistence/queue/QueueScope.java  |   2 +-
 .../persistence/queue/QueueScopeFactory.java    |  34 ------
 .../persistence/queue/guice/QueueModule.java    |  17 ++-
 .../queue/impl/QueueScopeFactoryImpl.java       |  48 --------
 .../persistence/queue/impl/QueueScopeImpl.java  |  27 +----
 .../queue/impl/SQSQueueManagerImpl.java         |   6 +-
 .../persistence/queue/NoAWSCredsRule.java       |  81 +++++++++++++
 .../persistence/queue/QueueManagerTest.java     |  31 ++---
 .../services/notifications/QueueListener.java   |   5 +-
 .../usergrid/services/queues/QueueListener.java |   5 +-
 12 files changed, 232 insertions(+), 141 deletions(-)
----------------------------------------------------------------------