You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/26 18:13:14 UTC

[47/50] [abbrv] usergrid git commit: Merge branch 'master' into datastax-cass-driver Fix issue with UniqueValueSerialization backwards compatibility via CQL and legacy Usergrid data.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index 185cfb7,3dbf1ec..c4c083f
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@@ -23,8 -23,7 +23,10 @@@ import java.util.Collections
  import java.util.Iterator;
  import java.util.UUID;
  
 -import com.netflix.astyanax.model.ConsistencyLevel;
 +import com.datastax.driver.core.BatchStatement;
++import com.datastax.driver.core.ConsistencyLevel;
 +import com.datastax.driver.core.Session;
++
  import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Rule;
@@@ -49,7 -47,7 +50,6 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.model.util.UUIDGenerator;
  
  import com.google.inject.Inject;
--import com.netflix.astyanax.MutationBatch;
  import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
  
  import static org.junit.Assert.assertEquals;
@@@ -365,4 -341,232 +365,229 @@@ public abstract class UniqueValueSerial
  
      }
  
+     /**
+      * Test that inserting duplicates always show the oldest entity UUID being returned (versions of that OK to change).
+      *
+      * @throws ConnectionException
+      * @throws InterruptedException
+      */
+     @Test
+     public void testWritingDuplicates() throws ConnectionException, InterruptedException {
+ 
+         ApplicationScope scope =
+             new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ 
+         IntegerField field = new IntegerField( "count", 5 );
+         Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ 
+ 
+ 
+         UUID version1 = UUIDGenerator.newTimeUUID();
+         UUID version2 = UUIDGenerator.newTimeUUID();
+ 
+         UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version2 );
+         UniqueValue stored2 = new UniqueValueImpl( field, entityId2,  version1 );
+ 
+ 
 -        strategy.write( scope, stored1 ).execute();
 -        strategy.write( scope, stored2 ).execute();
++        session.execute(strategy.writeCQL( scope, stored1, -1 ));
++        session.execute(strategy.writeCQL( scope, stored2, -1 ));
+ 
+         // load descending to get the older version of entity for this unique value
 -        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         UniqueValue retrieved = fields.getValue( field.getName() );
+ 
+         // validate that the first entity UUID is returned after inserting a duplicate mapping
+         assertEquals( stored1, retrieved );
+ 
+ 
+ 
+         UUID version3 = UUIDGenerator.newTimeUUID();
+         UniqueValue stored3 = new UniqueValueImpl( field, entityId2, version3);
 -        strategy.write( scope, stored3 ).execute();
++        session.execute(strategy.writeCQL( scope, stored3, -1 ));
+ 
+         // load the values again, we should still only get back the original unique value
 -        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         retrieved = fields.getValue( field.getName() );
+ 
+         // validate that the first entity UUID is still returned after inserting duplicate with newer version
+         assertEquals( stored1, retrieved );
+ 
+ 
+         UUID version4 = UUIDGenerator.newTimeUUID();
+         UniqueValue stored4 = new UniqueValueImpl( field, entityId1, version4);
 -        strategy.write( scope, stored4 ).execute();
++        session.execute(strategy.writeCQL( scope, stored4, -1 ));
+ 
+         // load the values again, now we should get the latest version of the original UUID written
 -        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         retrieved = fields.getValue( field.getName() );
+ 
+         // validate that the first entity UUID is still returned, but with the latest version
+         assertEquals( stored4, retrieved );
+ 
+     }
+ 
+     /**
+      * Test that inserting multiple versions of the same entity UUID result in the latest version being returned.
+      *
+      * @throws ConnectionException
+      * @throws InterruptedException
+      */
+     @Test
+     public void testMultipleVersionsSameEntity() throws ConnectionException, InterruptedException {
+ 
+         ApplicationScope scope =
+             new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ 
+         IntegerField field = new IntegerField( "count", 5 );
+         Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ 
+ 
+ 
+         UUID version1 = UUIDGenerator.newTimeUUID();
+         UUID version2 = UUIDGenerator.newTimeUUID();
+ 
+         UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 );
+         UniqueValue stored2 = new UniqueValueImpl( field, entityId1,  version2 );
+ 
+ 
 -        strategy.write( scope, stored1 ).execute();
 -        strategy.write( scope, stored2 ).execute();
++        session.execute(strategy.writeCQL( scope, stored1, -1 ));
++        session.execute(strategy.writeCQL( scope, stored2, -1 ));
+ 
+         // load descending to get the older version of entity for this unique value
 -        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         UniqueValue retrieved = fields.getValue( field.getName() );
+         Assert.assertNotNull( retrieved );
+         assertEquals( stored2, retrieved );
+ 
+ 
+     }
+ 
+     @Test
+     public void testDuplicateEntitiesDescending() throws ConnectionException, InterruptedException {
+ 
+         ApplicationScope scope =
+             new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ 
+         IntegerField field = new IntegerField( "count", 5 );
+         Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ 
+ 
+ 
+         UUID version1 = UUIDGenerator.newTimeUUID();
+         UUID version2 = UUIDGenerator.newTimeUUID();
+         UUID version3 = UUIDGenerator.newTimeUUID();
+ 
+         UniqueValue stored1 = new UniqueValueImpl( field, entityId3, version1 );
+         UniqueValue stored2 = new UniqueValueImpl( field, entityId2,  version2 );
+         UniqueValue stored3 = new UniqueValueImpl( field, entityId1,  version3 );
+ 
+ 
 -        strategy.write( scope, stored1 ).execute();
 -        strategy.write( scope, stored2 ).execute();
 -        strategy.write( scope, stored3 ).execute();
++        session.execute(strategy.writeCQL( scope, stored1, -1 ));
++        session.execute(strategy.writeCQL( scope, stored2, -1 ));
++        session.execute(strategy.writeCQL( scope, stored3, -1 ));
+ 
+ 
+         // load descending to get the older version of entity for this unique value
 -        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+ 
 -        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
 -            entityId1.getType(), Collections.<Field>singleton( field ), false);
 -
+         UniqueValue retrieved = fields.getValue( field.getName() );
+         assertEquals( stored3, retrieved );
+ 
+ 
+     }
+ 
+     @Test
+     public void testDuplicateEntitiesAscending() throws ConnectionException, InterruptedException {
+ 
+         ApplicationScope scope =
+             new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ 
+         IntegerField field = new IntegerField( "count", 5 );
+         Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ 
+ 
+ 
+         UUID version1 = UUIDGenerator.newTimeUUID();
+         UUID version2 = UUIDGenerator.newTimeUUID();
+         UUID version3 = UUIDGenerator.newTimeUUID();
+ 
+         UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 );
+         UniqueValue stored2 = new UniqueValueImpl( field, entityId2,  version2 );
+         UniqueValue stored3 = new UniqueValueImpl( field, entityId3,  version3 );
+ 
+ 
 -        strategy.write( scope, stored1 ).execute();
 -        strategy.write( scope, stored2 ).execute();
 -        strategy.write( scope, stored3 ).execute();
++        session.execute(strategy.writeCQL( scope, stored1, -1 ));
++        session.execute(strategy.writeCQL( scope, stored2, -1 ));
++        session.execute(strategy.writeCQL( scope, stored3, -1 ));
+ 
+ 
+         // load descending to get the older version of entity for this unique value
+         UniqueValueSet fields = strategy.load( scope,
 -            ConsistencyLevel.CL_LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
++            ConsistencyLevel.LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         UniqueValue retrieved = fields.getValue( field.getName() );
+         assertEquals( stored1, retrieved );
+ 
+ 
+     }
+ 
+     @Test
+     public void testMixedDuplicates() throws ConnectionException, InterruptedException {
+ 
+         ApplicationScope scope =
+             new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ 
+         IntegerField field = new IntegerField( "count", 5 );
+         Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+         Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ 
+ 
+ 
+         UUID version1 = UUIDGenerator.newTimeUUID();
+         UUID version2 = UUIDGenerator.newTimeUUID();
+         UUID version3 = UUIDGenerator.newTimeUUID();
+         UUID version4 = UUIDGenerator.newTimeUUID();
+         UUID version5 = UUIDGenerator.newTimeUUID();
+ 
+         UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version5 );
+         UniqueValue stored2 = new UniqueValueImpl( field, entityId2,  version4 );
+         UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
+         UniqueValue stored4 = new UniqueValueImpl( field, entityId3,  version2 );
+         UniqueValue stored5 = new UniqueValueImpl( field, entityId3,  version1 );
+ 
+ 
+ 
 -        strategy.write( scope, stored1 ).execute();
 -        strategy.write( scope, stored2 ).execute();
 -        strategy.write( scope, stored3 ).execute();
 -        strategy.write( scope, stored4 ).execute();
 -        strategy.write( scope, stored5 ).execute();
++        session.execute(strategy.writeCQL( scope, stored1, -1 ));
++        session.execute(strategy.writeCQL( scope, stored2, -1 ));
++        session.execute(strategy.writeCQL( scope, stored3, -1 ));
++        session.execute(strategy.writeCQL( scope, stored4, -1 ));
++        session.execute(strategy.writeCQL( scope, stored5, -1 ));
+ 
+ 
+         // load descending to get the older version of entity for this unique value
 -        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++        UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+             entityId1.getType(), Collections.<Field>singleton( field ), true);
+ 
+         UniqueValue retrieved = fields.getValue( field.getName() );
+         assertEquals( stored1, retrieved );
+ 
+ 
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
index 92d0041,d404b1e..ebae735
--- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
@@@ -1,7 -1,26 +1,25 @@@
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements.  See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership.  The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License.  You may obtain a copy of the License at
+ #
+ #     http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing,
+ # software distributed under the License is distributed on an
+ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ # KIND, either express or implied.  See the License for the
+ # specific language governing permissions and limitations
+ # under the License.
+ #
+ 
  # These are for CHOP environment settings
  
 -cassandra.connections=20
 +cassandra.connections=50
  cassandra.port=9160
 -cassandra.version=1.2
  
  # a comma delimited private IP address list to your chop cassandra cluster
  # define this in your settings.xml and have it as an always active profile

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --cc stack/corepersistence/common/pom.xml
index c389a5b,bbcadff..63d339b
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@@ -57,9 -73,9 +73,13 @@@
              <version>${cassandra.version}</version>
              <exclusions>
                  <exclusion>
 +                    <groupId>net.jpountz.lz4</groupId>
 +                    <artifactId>*</artifactId>
 +                </exclusion>
++                <exclusion>
+                     <artifactId>netty</artifactId>
+                     <groupId>io.netty</groupId>
+                 </exclusion>
              </exclusions>
          </dependency>
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index 3c58dfb,0000000..2996465
mode 100644,000000..100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@@ -1,225 -1,0 +1,239 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.usergrid.persistence.core;
 +
 +
 +import org.safehaus.guicyfig.Default;
 +import org.safehaus.guicyfig.FigSingleton;
 +import org.safehaus.guicyfig.GuicyFig;
 +import org.safehaus.guicyfig.Key;
 +
 +
 +/**
 + * Cassandra configuration interface.
 + */
 +@FigSingleton
 +public interface CassandraFig extends GuicyFig {
 +
 +    // cassndra properties used by datastax driver
 +    String READ_CL = "cassandra.readcl";
 +    String READ_CL_CONSISTENT = "cassandra.readcl.consistent";
 +    String WRITE_CL = "cassandra.writecl";
 +    String STRATEGY = "cassandra.strategy";
 +    String STRATEGY_OPTIONS = "cassandra.strategy.options";
 +
 +    // main application cassandra properties
 +    String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
 +    String ASTYANAX_READ_CL = "usergrid.read.cl";
 +    String ASTYANAX_WRITE_CL = "usergrid.write.cl";
 +    String SHARD_VALUES = "cassandra.shardvalues";
 +    String THRIFT_TRANSPORT_SIZE = "cassandra.thrift.transport.frame";
 +    String USERNAME = "cassandra.username";
 +    String PASSWORD = "cassandra.password";
 +
 +    // locks cassandra properties
 +    String LOCKS_KEYSPACE_NAME = "cassandra.lock.keyspace";
 +    String LOCKS_KEYSPACE_REPLICATION = "cassandra.lock.keyspace.replication";
 +    String LOCKS_KEYSPACE_STRATEGY = "cassandra.lock.keyspace.strategy";
 +    String LOCKS_CL = "cassandra.lock.cl";
 +    String LOCKS_SHARED_POOL_FLAG = "cassandra.lock.use_shared_pool";
 +    String LOCKS_CONNECTIONS = "cassandra.lock.connections";
 +    String LOCKS_EXPIRATION = "cassandra.lock.expiration.milliseconds";
 +
- 
- 
++    String LOCK_MANAGER_INIT_RETRIES = "cassandra.lock.init.retries";
++    String LOCK_MANAGER_INIT_INTERVAL = "cassandra.lock.init.interval";
 +
 +    // re-usable default values
 +    String DEFAULT_CONNECTION_POOLSIZE = "15";
 +    String DEFAULT_LOCKS_EXPIRATION = "3600000";  // 1 hour
 +    String DEFAULT_LOCAL_DC = "";
 +    String DEFAULT_USERNAME = "";
 +    String DEFAULT_PASSWORD = "";
 +
 +
 +    @Key( "cassandra.hosts" )
 +    String getHosts();
 +
 +    /**
 +     * Valid options are 1.2, 2.0, 2.1
 +     *
 +     * @return
 +     */
 +    @Key( "cassandra.version" )
 +    @Default( "2.1" )
 +    String getVersion();
 +
 +    @Key( "cassandra.cluster_name" )
 +    @Default( "Usergrid" )
 +    String getClusterName();
 +
 +    @Key( "cassandra.keyspace.application" )
 +    @Default( "Usergrid_Applications" )
 +    String getApplicationKeyspace();
 +
 +    @Key( "cassandra.port" )
 +    @Default( "9160" )
 +    int getThriftPort();
 +
 +    @Key( USERNAME )
 +    @Default( DEFAULT_USERNAME )
 +    String getUsername();
 +
 +    @Key( PASSWORD )
 +    @Default( DEFAULT_PASSWORD )
 +    String getPassword();
 +
 +    @Key( "cassandra.datacenter.local" )
 +    @Default( DEFAULT_LOCAL_DC )
 +    String getLocalDataCenter();
 +
 +    @Key( "cassandra.connections" )
 +    @Default( DEFAULT_CONNECTION_POOLSIZE )
 +    int getConnections();
 +
 +    @Key( "cassandra.timeout" )
 +    @Default( "10000" )
 +    int getTimeout();
 +
 +    @Key( "cassandra.timeout.pool" )
 +    @Default( "5000" )
 +    int getPoolTimeout();
 +
 +    @Key("cassandra.discovery")
 +    @Default( "RING_DESCRIBE" )
 +    String getDiscoveryType();
 +
 +
 +    @Default("CL_LOCAL_QUORUM")
 +    @Key(ASTYANAX_READ_CL)
 +    String getAstyanaxReadCL();
 +
 +    @Default("CL_QUORUM")
 +    @Key(ASTYANAX_READ_CONSISTENT_CL)
 +    String getAstyanaxConsistentReadCL();
 +
 +    @Default("CL_LOCAL_QUORUM")
 +    @Key(ASTYANAX_WRITE_CL)
 +    String getAstyanaxWriteCL();
 +
 +
 +    @Default("LOCAL_QUORUM")
 +    @Key(READ_CL)
 +    String getReadCl();
 +
 +    @Default("QUORUM")
 +    @Key(READ_CL_CONSISTENT)
 +    String getReadClConsistent();
 +
 +    @Default("LOCAL_QUORUM")
 +    @Key(WRITE_CL)
 +    String getWriteCl();
 +
 +    @Default("SimpleStrategy")
 +    @Key( STRATEGY )
 +    String getStrategy();
 +
 +    @Default("replication_factor:1")
 +    @Key( STRATEGY_OPTIONS )
 +    String getStrategyOptions();
 +
 +    /**
 +     * Return the history of all shard values which are immutable.  For instance, if shard values
 +     * are initially set to 20 (the default) then increased to 40, the property should contain the string of
 +     * "20, 40" so that we can read historic data.
 +     *
 +     * @return
 +     */
 +    @Default("20")
 +    @Key(SHARD_VALUES)
 +    String getShardValues();
 +
 +    /**
 +     * Get the thrift transport size.  Should be set to what is on the cassandra servers.  As we move to CQL, this will become obsolete
 +     * @return
 +     */
 +    @Key( THRIFT_TRANSPORT_SIZE)
 +    @Default( "15728640" )
 +    int getThriftBufferSize();
 +
 +
 +    /**
 +     * Returns the name of the keyspace that should be used for Locking
 +     */
 +    @Key( LOCKS_KEYSPACE_NAME )
 +    @Default("Locks")
 +    String getLocksKeyspace();
 +
 +    /**
 +     * Returns the Astyanax consistency level for writing a Lock
 +     */
 +    @Key(LOCKS_CL)
 +    @Default("CL_LOCAL_QUORUM")
 +    String getLocksCl();
 +
 +    /**
 +     * Returns a flag on whether or not to share the connection pool with other keyspaces
 +     */
 +    @Key( LOCKS_SHARED_POOL_FLAG )
 +    @Default("true")
 +    boolean useSharedPoolForLocks();
 +
 +    /**
 +     * Returns a flag on whether or not to share the connection pool with other keyspaces
 +     */
 +    @Key( LOCKS_CONNECTIONS )
 +    @Default( DEFAULT_CONNECTION_POOLSIZE )
 +    int getConnectionsLocks();
 +
 +    /**
 +     * Returns a flag on whether or not to share the connection pool with other keyspaces
 +     */
 +    @Key( LOCKS_KEYSPACE_REPLICATION )
 +    @Default("replication_factor:1")
 +    String getLocksKeyspaceReplication();
 +
 +    /**
 +     * Returns a flag on whether or not to share the connection pool with other keyspaces
 +     */
 +    @Key( LOCKS_KEYSPACE_STRATEGY )
 +    @Default( "org.apache.cassandra.locator.SimpleStrategy" )
 +    String getLocksKeyspaceStrategy();
 +
 +    /**
 +     * Return the expiration that should be used for expiring a lock if it's not released
 +     */
 +    @Key( LOCKS_EXPIRATION )
 +    @Default(DEFAULT_LOCKS_EXPIRATION)
 +    int getLocksExpiration();
 +
++    /**
++     * How many times to attempt lock keyspace and column family creation
++     */
++    @Key( LOCK_MANAGER_INIT_RETRIES )
++    @Default( "100" )
++    int getLockManagerInitRetries();
++
++    /**
++     * Return the expiration that should be used for expiring a lock if it's not released
++     */
++    @Key( LOCK_MANAGER_INIT_INTERVAL )
++    @Default( "1000" )
++    int getLockManagerInitInterval();
++
 +}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/rest/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
index 0000000,68f366b..40cedcd
mode 000000,100644..100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
@@@ -1,0 -1,298 +1,298 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.usergrid.tools;
+ 
+ 
+ import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
++import com.datastax.driver.core.ConsistencyLevel;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import com.netflix.astyanax.model.Column;
 -import com.netflix.astyanax.model.ConsistencyLevel;
+ import com.netflix.astyanax.util.RangeBuilder;
+ import org.apache.usergrid.persistence.Entity;
+ import org.apache.usergrid.persistence.EntityManager;
+ import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+ import org.apache.usergrid.persistence.collection.serialization.impl.*;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+ import org.apache.usergrid.persistence.model.entity.SimpleId;
+ import org.apache.usergrid.persistence.model.field.StringField;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.Options;
+ 
+ 
+ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+ 
+ 
+ 
+ public class UniqueValueScanner extends ToolBase {
+ 
+     private static final Logger logger = LoggerFactory.getLogger( UniqueValueScanner.class );
+ 
+     private static final String APPLICATION_ARG = "app";
+ 
+     private static final String ENTITY_TYPE_ARG = "entityType";
+ 
+     private static final String ENTITY_NAME_ARG = "entityName";
+ 
+     private static final String ENTITY_FIELD_TYPE_ARG = "fieldType";
+ 
+ 
+ 
+     //copied shamelessly from unique value serialization strat.
+     private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER =
+         new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() );
+ 
+ 
+     private final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
+ 
+     private final MultiTenantColumnFamily<ScopedRowKey<TypeField>, EntityVersion> CF_UNIQUE_VALUES =
+         new MultiTenantColumnFamily<>( "Unique_Values_V2", ROW_KEY_SER, ENTITY_VERSION_SER );
+ 
+     private com.netflix.astyanax.Keyspace keyspace;
+ 
+     private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+ 
+     private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ 
+     private EntityManager em;
+ 
+     @Override
+     @SuppressWarnings( "static-access" )
+     public Options createOptions() {
+ 
+ 
+         Options options = super.createOptions();
+ 
+ 
+         Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true )
+             .withDescription( "application id" ).create( APPLICATION_ARG );
+ 
+ 
+         options.addOption( appOption );
+ 
+         Option collectionOption =
+             OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "collection name" )
+                 .create(ENTITY_TYPE_ARG);
+ 
+         options.addOption( collectionOption );
+ 
+         Option specificEntityNameOption =
+             OptionBuilder.withArgName(ENTITY_NAME_ARG).hasArg().isRequired( false ).withDescription( "specific entity name" )
+                 .create(ENTITY_NAME_ARG);
+ 
+         options.addOption( specificEntityNameOption );
+ 
+         Option fieldTypeOption =
+             OptionBuilder.withArgName(ENTITY_FIELD_TYPE_ARG).hasArg().isRequired( false ).withDescription( "field type" )
+                 .create(ENTITY_FIELD_TYPE_ARG);
+ 
+         options.addOption( fieldTypeOption );
+ 
+         return options;
+     }
+ 
+ 
+     /*
+      * (non-Javadoc)
+      *
+      * @see
+      * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
+      */
+     @Override
+     public void runTool( CommandLine line ) throws Exception {
+ 
+         startSpring();
+ 
+         UUID appToFilter = null;
+         if (!line.getOptionValue(APPLICATION_ARG).isEmpty()) {
+             appToFilter = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
+         }
+ 
+         logger.info("Staring Tool: UniqueValueScanner");
+         logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
+ 
+ 
+         keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class);
+         mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+         uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+ 
+         String fieldType =
+             line.getOptionValue(ENTITY_FIELD_TYPE_ARG) != null ?  line.getOptionValue(ENTITY_FIELD_TYPE_ARG)  : "name" ;
+         String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
+         String entityName = line.getOptionValue(ENTITY_NAME_ARG);
+ 
+         AtomicInteger count = new AtomicInteger(0);
+ 
+         if (entityName != null && !entityName.isEmpty()) {
+ 
+             if(appToFilter == null){
+                 throw new RuntimeException("Cannot execute UniqueValueScanner with specific entity without the " +
+                     "application UUID for which the entity should exist.");
+             }
+ 
+             if(entityType == null){
+                 throw new RuntimeException("Cannot execute UniqueValueScanner without the entity type (singular " +
+                     "collection name).");
+             }
+ 
+             logger.info("Running entity unique load only");
+ 
+ 
+             //do stuff w/o read repair
+             UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load(
+                 new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ),
 -                ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")), entityType,
++                ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "LOCAL_QUORUM")), entityType,
+                 Collections.singletonList(new StringField( fieldType, entityName) ), false);
+ 
+             StringBuilder stringBuilder = new StringBuilder();
+ 
+             stringBuilder.append("[");
+ 
+             uniqueValueSet.forEach( uniqueValue -> {
+ 
+ 
+                 String entry = "fieldName="+uniqueValue.getField().getName()+
+                     ", fieldValue="+uniqueValue.getField().getValue()+
+                     ", uuid="+uniqueValue.getEntityId().getUuid()+
+                     ", type="+uniqueValue.getEntityId().getType()+
+                     ", version="+uniqueValue.getEntityVersion();
+                 stringBuilder.append("{").append(entry).append("},");
+             });
+ 
+             stringBuilder.deleteCharAt(stringBuilder.length() -1);
+             stringBuilder.append("]");
+ 
+             logger.info("Returned unique value set from serialization load = {}", stringBuilder.toString());
+ 
+         } else {
+ 
+             logger.info("Running entity unique scanner only");
+ 
+ 
+             // scan through all unique values and log some info
+ 
+             Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<TypeField>, EntityVersion>> rows = null;
+             try {
+ 
+                 rows = keyspace.prepareQuery(CF_UNIQUE_VALUES)
 -                    .setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
++                    .setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
+                     .getAllRows()
+                     .withColumnRange(new RangeBuilder().setLimit(1000).build())
+                     .execute().getResult().iterator();
+ 
+             } catch (ConnectionException e) {
+ 
+                 logger.error("Error connecting to cassandra", e);
+             }
+ 
+ 
+             UUID finalAppToFilter = appToFilter;
+ 
+             if( rows != null) {
+                 rows.forEachRemaining(row -> {
+ 
+                     count.incrementAndGet();
+ 
+                     if(count.get() % 1000 == 0 ){
+                         logger.info("Scanned {} rows in {}", count.get(), CF_UNIQUE_VALUES.getName());
+                     }
+ 
+                     final String fieldName = row.getKey().getKey().getField().getName();
+                     final String fieldValue = row.getKey().getKey().getField().getValue().toString();
+                     final String scopeType = row.getKey().getScope().getType();
+                     final UUID scopeUUID = row.getKey().getScope().getUuid();
+ 
+ 
+                     if (!fieldName.equalsIgnoreCase(fieldType) ||
+                         (finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID))
+                         ) {
+                         // do nothing
+ 
+                     } else {
+ 
+ 
+                         // if we have more than 1 column, let's check for a duplicate
+                         if (row.getColumns() != null && row.getColumns().size() > 1) {
+ 
+                             final List<EntityVersion> values = new ArrayList<>(row.getColumns().size());
+ 
+                             Iterator<Column<EntityVersion>> columns = row.getColumns().iterator();
+                             columns.forEachRemaining(column -> {
+ 
+ 
+                                 final EntityVersion entityVersion = column.getName();
+ 
+ 
+                                 logger.trace(
+                                     scopeType + ": " + scopeUUID + ", " +
+                                         fieldName + ": " + fieldValue + ", " +
+                                         "entity type: " + entityVersion.getEntityId().getType() + ", " +
+                                         "entity uuid: " + entityVersion.getEntityId().getUuid()
+                                 );
+ 
+ 
+                                 if (entityType != null &&
+                                     entityVersion.getEntityId().getType().equalsIgnoreCase(entityType)
+                                     ) {
+ 
+                                     // add the first value into the list
+                                     if (values.size() == 0) {
+ 
+                                         values.add(entityVersion);
+ 
+ 
+                                     } else {
+ 
+                                         if (!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())) {
+ 
+                                             values.add(entityVersion);
+ 
+                                             logger.error("Duplicate found for field [{}={}].  Entry 1: [{}], Entry 2: [{}]",
+                                                 fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId());
+ 
+                                         }
+ 
+                                     }
+ 
+ 
+                                 }
+ 
+                             });
+                         }
+                     }
+ 
+ 
+                 });
+             }else{
+ 
+                 logger.warn("No rows returned from table: {}", CF_UNIQUE_VALUES.getName());
+ 
+             }
+ 
+         }
+     }
+ }