You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/26 18:13:14 UTC
[47/50] [abbrv] usergrid git commit: Merge branch 'master' into
datastax-cass-driver Fix issue with UniqueValueSerialization backwards
compatibility via CQL and legacy Usergrid data.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index 185cfb7,3dbf1ec..c4c083f
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@@ -23,8 -23,7 +23,10 @@@ import java.util.Collections
import java.util.Iterator;
import java.util.UUID;
-import com.netflix.astyanax.model.ConsistencyLevel;
+import com.datastax.driver.core.BatchStatement;
++import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
++
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@@ -49,7 -47,7 +50,6 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
--import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static org.junit.Assert.assertEquals;
@@@ -365,4 -341,232 +365,229 @@@ public abstract class UniqueValueSerial
}
+ /**
+ * Test that inserting duplicates always show the oldest entity UUID being returned (versions of that OK to change).
+ *
+ * @throws ConnectionException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testWritingDuplicates() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version2 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version1 );
+
+
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
++ session.execute(strategy.writeCQL( scope, stored1, -1 ));
++ session.execute(strategy.writeCQL( scope, stored2, -1 ));
+
+ // load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+
+ // validate that the first entity UUID is returned after inserting a duplicate mapping
+ assertEquals( stored1, retrieved );
+
+
+
+ UUID version3 = UUIDGenerator.newTimeUUID();
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId2, version3);
- strategy.write( scope, stored3 ).execute();
++ session.execute(strategy.writeCQL( scope, stored3, -1 ));
+
+ // load the values again, we should still only get back the original unique value
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ retrieved = fields.getValue( field.getName() );
+
+ // validate that the first entity UUID is still returned after inserting duplicate with newer version
+ assertEquals( stored1, retrieved );
+
+
+ UUID version4 = UUIDGenerator.newTimeUUID();
+ UniqueValue stored4 = new UniqueValueImpl( field, entityId1, version4);
- strategy.write( scope, stored4 ).execute();
++ session.execute(strategy.writeCQL( scope, stored4, -1 ));
+
+ // load the values again, now we should get the latest version of the original UUID written
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ retrieved = fields.getValue( field.getName() );
+
+ // validate that the first entity UUID is still returned, but with the latest version
+ assertEquals( stored4, retrieved );
+
+ }
+
+ /**
+ * Test that inserting multiple versions of the same entity UUID result in the latest version being returned.
+ *
+ * @throws ConnectionException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testMultipleVersionsSameEntity() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId1, version2 );
+
+
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
++ session.execute(strategy.writeCQL( scope, stored1, -1 ));
++ session.execute(strategy.writeCQL( scope, stored2, -1 ));
+
+ // load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ Assert.assertNotNull( retrieved );
+ assertEquals( stored2, retrieved );
+
+
+ }
+
+ @Test
+ public void testDuplicateEntitiesDescending() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId3, version1 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
+
+
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
++ session.execute(strategy.writeCQL( scope, stored1, -1 ));
++ session.execute(strategy.writeCQL( scope, stored2, -1 ));
++ session.execute(strategy.writeCQL( scope, stored3, -1 ));
+
+
+ // load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
- entityId1.getType(), Collections.<Field>singleton( field ), false);
-
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored3, retrieved );
+
+
+ }
+
+ @Test
+ public void testDuplicateEntitiesAscending() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId3, version3 );
+
+
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
++ session.execute(strategy.writeCQL( scope, stored1, -1 ));
++ session.execute(strategy.writeCQL( scope, stored2, -1 ));
++ session.execute(strategy.writeCQL( scope, stored3, -1 ));
+
+
+ // load descending to get the older version of entity for this unique value
+ UniqueValueSet fields = strategy.load( scope,
- ConsistencyLevel.CL_LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
++ ConsistencyLevel.LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored1, retrieved );
+
+
+ }
+
+ @Test
+ public void testMixedDuplicates() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+ UUID version4 = UUIDGenerator.newTimeUUID();
+ UUID version5 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version5 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version4 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
+ UniqueValue stored4 = new UniqueValueImpl( field, entityId3, version2 );
+ UniqueValue stored5 = new UniqueValueImpl( field, entityId3, version1 );
+
+
+
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
- strategy.write( scope, stored4 ).execute();
- strategy.write( scope, stored5 ).execute();
++ session.execute(strategy.writeCQL( scope, stored1, -1 ));
++ session.execute(strategy.writeCQL( scope, stored2, -1 ));
++ session.execute(strategy.writeCQL( scope, stored3, -1 ));
++ session.execute(strategy.writeCQL( scope, stored4, -1 ));
++ session.execute(strategy.writeCQL( scope, stored5, -1 ));
+
+
+ // load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
+ entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored1, retrieved );
+
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
index 92d0041,d404b1e..ebae735
--- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
@@@ -1,7 -1,26 +1,25 @@@
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements. See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership. The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing,
+ # software distributed under the License is distributed on an
+ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ # KIND, either express or implied. See the License for the
+ # specific language governing permissions and limitations
+ # under the License.
+ #
+
# These are for CHOP environment settings
-cassandra.connections=20
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
# a comma delimited private IP address list to your chop cassandra cluster
# define this in your settings.xml and have it as an always active profile
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --cc stack/corepersistence/common/pom.xml
index c389a5b,bbcadff..63d339b
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@@ -57,9 -73,9 +73,13 @@@
<version>${cassandra.version}</version>
<exclusions>
<exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
++ <exclusion>
+ <artifactId>netty</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index 3c58dfb,0000000..2996465
mode 100644,000000..100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@@ -1,225 -1,0 +1,239 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Cassandra configuration interface.
+ */
+@FigSingleton
+public interface CassandraFig extends GuicyFig {
+
+ // cassndra properties used by datastax driver
+ String READ_CL = "cassandra.readcl";
+ String READ_CL_CONSISTENT = "cassandra.readcl.consistent";
+ String WRITE_CL = "cassandra.writecl";
+ String STRATEGY = "cassandra.strategy";
+ String STRATEGY_OPTIONS = "cassandra.strategy.options";
+
+ // main application cassandra properties
+ String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
+ String ASTYANAX_READ_CL = "usergrid.read.cl";
+ String ASTYANAX_WRITE_CL = "usergrid.write.cl";
+ String SHARD_VALUES = "cassandra.shardvalues";
+ String THRIFT_TRANSPORT_SIZE = "cassandra.thrift.transport.frame";
+ String USERNAME = "cassandra.username";
+ String PASSWORD = "cassandra.password";
+
+ // locks cassandra properties
+ String LOCKS_KEYSPACE_NAME = "cassandra.lock.keyspace";
+ String LOCKS_KEYSPACE_REPLICATION = "cassandra.lock.keyspace.replication";
+ String LOCKS_KEYSPACE_STRATEGY = "cassandra.lock.keyspace.strategy";
+ String LOCKS_CL = "cassandra.lock.cl";
+ String LOCKS_SHARED_POOL_FLAG = "cassandra.lock.use_shared_pool";
+ String LOCKS_CONNECTIONS = "cassandra.lock.connections";
+ String LOCKS_EXPIRATION = "cassandra.lock.expiration.milliseconds";
+
-
-
++ String LOCK_MANAGER_INIT_RETRIES = "cassandra.lock.init.retries";
++ String LOCK_MANAGER_INIT_INTERVAL = "cassandra.lock.init.interval";
+
+ // re-usable default values
+ String DEFAULT_CONNECTION_POOLSIZE = "15";
+ String DEFAULT_LOCKS_EXPIRATION = "3600000"; // 1 hour
+ String DEFAULT_LOCAL_DC = "";
+ String DEFAULT_USERNAME = "";
+ String DEFAULT_PASSWORD = "";
+
+
+ @Key( "cassandra.hosts" )
+ String getHosts();
+
+ /**
+ * Valid options are 1.2, 2.0, 2.1
+ *
+ * @return
+ */
+ @Key( "cassandra.version" )
+ @Default( "2.1" )
+ String getVersion();
+
+ @Key( "cassandra.cluster_name" )
+ @Default( "Usergrid" )
+ String getClusterName();
+
+ @Key( "cassandra.keyspace.application" )
+ @Default( "Usergrid_Applications" )
+ String getApplicationKeyspace();
+
+ @Key( "cassandra.port" )
+ @Default( "9160" )
+ int getThriftPort();
+
+ @Key( USERNAME )
+ @Default( DEFAULT_USERNAME )
+ String getUsername();
+
+ @Key( PASSWORD )
+ @Default( DEFAULT_PASSWORD )
+ String getPassword();
+
+ @Key( "cassandra.datacenter.local" )
+ @Default( DEFAULT_LOCAL_DC )
+ String getLocalDataCenter();
+
+ @Key( "cassandra.connections" )
+ @Default( DEFAULT_CONNECTION_POOLSIZE )
+ int getConnections();
+
+ @Key( "cassandra.timeout" )
+ @Default( "10000" )
+ int getTimeout();
+
+ @Key( "cassandra.timeout.pool" )
+ @Default( "5000" )
+ int getPoolTimeout();
+
+ @Key("cassandra.discovery")
+ @Default( "RING_DESCRIBE" )
+ String getDiscoveryType();
+
+
+ @Default("CL_LOCAL_QUORUM")
+ @Key(ASTYANAX_READ_CL)
+ String getAstyanaxReadCL();
+
+ @Default("CL_QUORUM")
+ @Key(ASTYANAX_READ_CONSISTENT_CL)
+ String getAstyanaxConsistentReadCL();
+
+ @Default("CL_LOCAL_QUORUM")
+ @Key(ASTYANAX_WRITE_CL)
+ String getAstyanaxWriteCL();
+
+
+ @Default("LOCAL_QUORUM")
+ @Key(READ_CL)
+ String getReadCl();
+
+ @Default("QUORUM")
+ @Key(READ_CL_CONSISTENT)
+ String getReadClConsistent();
+
+ @Default("LOCAL_QUORUM")
+ @Key(WRITE_CL)
+ String getWriteCl();
+
+ @Default("SimpleStrategy")
+ @Key( STRATEGY )
+ String getStrategy();
+
+ @Default("replication_factor:1")
+ @Key( STRATEGY_OPTIONS )
+ String getStrategyOptions();
+
+ /**
+ * Return the history of all shard values which are immutable. For instance, if shard values
+ * are initially set to 20 (the default) then increased to 40, the property should contain the string of
+ * "20, 40" so that we can read historic data.
+ *
+ * @return
+ */
+ @Default("20")
+ @Key(SHARD_VALUES)
+ String getShardValues();
+
+ /**
+ * Get the thrift transport size. Should be set to what is on the cassandra servers. As we move to CQL, this will become obsolete
+ * @return
+ */
+ @Key( THRIFT_TRANSPORT_SIZE)
+ @Default( "15728640" )
+ int getThriftBufferSize();
+
+
+ /**
+ * Returns the name of the keyspace that should be used for Locking
+ */
+ @Key( LOCKS_KEYSPACE_NAME )
+ @Default("Locks")
+ String getLocksKeyspace();
+
+ /**
+ * Returns the Astyanax consistency level for writing a Lock
+ */
+ @Key(LOCKS_CL)
+ @Default("CL_LOCAL_QUORUM")
+ String getLocksCl();
+
+ /**
+ * Returns a flag on whether or not to share the connection pool with other keyspaces
+ */
+ @Key( LOCKS_SHARED_POOL_FLAG )
+ @Default("true")
+ boolean useSharedPoolForLocks();
+
+ /**
+ * Returns a flag on whether or not to share the connection pool with other keyspaces
+ */
+ @Key( LOCKS_CONNECTIONS )
+ @Default( DEFAULT_CONNECTION_POOLSIZE )
+ int getConnectionsLocks();
+
+ /**
+ * Returns a flag on whether or not to share the connection pool with other keyspaces
+ */
+ @Key( LOCKS_KEYSPACE_REPLICATION )
+ @Default("replication_factor:1")
+ String getLocksKeyspaceReplication();
+
+ /**
+ * Returns a flag on whether or not to share the connection pool with other keyspaces
+ */
+ @Key( LOCKS_KEYSPACE_STRATEGY )
+ @Default( "org.apache.cassandra.locator.SimpleStrategy" )
+ String getLocksKeyspaceStrategy();
+
+ /**
+ * Return the expiration that should be used for expiring a lock if it's not released
+ */
+ @Key( LOCKS_EXPIRATION )
+ @Default(DEFAULT_LOCKS_EXPIRATION)
+ int getLocksExpiration();
+
++ /**
++ * How many times to attempt lock keyspace and column family creation
++ */
++ @Key( LOCK_MANAGER_INIT_RETRIES )
++ @Default( "100" )
++ int getLockManagerInitRetries();
++
++ /**
++ * Return the expiration that should be used for expiring a lock if it's not released
++ */
++ @Key( LOCK_MANAGER_INIT_INTERVAL )
++ @Default( "1000" )
++ int getLockManagerInitInterval();
++
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/rest/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
index 0000000,68f366b..40cedcd
mode 000000,100644..100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
@@@ -1,0 -1,298 +1,298 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.usergrid.tools;
+
+
+ import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
+
++import com.datastax.driver.core.ConsistencyLevel;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
+ import com.netflix.astyanax.util.RangeBuilder;
+ import org.apache.usergrid.persistence.Entity;
+ import org.apache.usergrid.persistence.EntityManager;
+ import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+ import org.apache.usergrid.persistence.collection.serialization.impl.*;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+ import org.apache.usergrid.persistence.model.entity.SimpleId;
+ import org.apache.usergrid.persistence.model.field.StringField;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.Options;
+
+
+ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+
+
+
+ public class UniqueValueScanner extends ToolBase {
+
+ private static final Logger logger = LoggerFactory.getLogger( UniqueValueScanner.class );
+
+ private static final String APPLICATION_ARG = "app";
+
+ private static final String ENTITY_TYPE_ARG = "entityType";
+
+ private static final String ENTITY_NAME_ARG = "entityName";
+
+ private static final String ENTITY_FIELD_TYPE_ARG = "fieldType";
+
+
+
+ //copied shamelessly from unique value serialization strat.
+ private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER =
+ new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() );
+
+
+ private final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
+
+ private final MultiTenantColumnFamily<ScopedRowKey<TypeField>, EntityVersion> CF_UNIQUE_VALUES =
+ new MultiTenantColumnFamily<>( "Unique_Values_V2", ROW_KEY_SER, ENTITY_VERSION_SER );
+
+ private com.netflix.astyanax.Keyspace keyspace;
+
+ private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+ private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+
+ private EntityManager em;
+
+ @Override
+ @SuppressWarnings( "static-access" )
+ public Options createOptions() {
+
+
+ Options options = super.createOptions();
+
+
+ Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true )
+ .withDescription( "application id" ).create( APPLICATION_ARG );
+
+
+ options.addOption( appOption );
+
+ Option collectionOption =
+ OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "collection name" )
+ .create(ENTITY_TYPE_ARG);
+
+ options.addOption( collectionOption );
+
+ Option specificEntityNameOption =
+ OptionBuilder.withArgName(ENTITY_NAME_ARG).hasArg().isRequired( false ).withDescription( "specific entity name" )
+ .create(ENTITY_NAME_ARG);
+
+ options.addOption( specificEntityNameOption );
+
+ Option fieldTypeOption =
+ OptionBuilder.withArgName(ENTITY_FIELD_TYPE_ARG).hasArg().isRequired( false ).withDescription( "field type" )
+ .create(ENTITY_FIELD_TYPE_ARG);
+
+ options.addOption( fieldTypeOption );
+
+ return options;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
+ */
+ @Override
+ public void runTool( CommandLine line ) throws Exception {
+
+ startSpring();
+
+ UUID appToFilter = null;
+ if (!line.getOptionValue(APPLICATION_ARG).isEmpty()) {
+ appToFilter = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
+ }
+
+ logger.info("Staring Tool: UniqueValueScanner");
+ logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
+
+
+ keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class);
+ mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+ uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+
+ String fieldType =
+ line.getOptionValue(ENTITY_FIELD_TYPE_ARG) != null ? line.getOptionValue(ENTITY_FIELD_TYPE_ARG) : "name" ;
+ String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
+ String entityName = line.getOptionValue(ENTITY_NAME_ARG);
+
+ AtomicInteger count = new AtomicInteger(0);
+
+ if (entityName != null && !entityName.isEmpty()) {
+
+ if(appToFilter == null){
+ throw new RuntimeException("Cannot execute UniqueValueScanner with specific entity without the " +
+ "application UUID for which the entity should exist.");
+ }
+
+ if(entityType == null){
+ throw new RuntimeException("Cannot execute UniqueValueScanner without the entity type (singular " +
+ "collection name).");
+ }
+
+ logger.info("Running entity unique load only");
+
+
+ //do stuff w/o read repair
+ UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load(
+ new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ),
- ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")), entityType,
++ ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "LOCAL_QUORUM")), entityType,
+ Collections.singletonList(new StringField( fieldType, entityName) ), false);
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ stringBuilder.append("[");
+
+ uniqueValueSet.forEach( uniqueValue -> {
+
+
+ String entry = "fieldName="+uniqueValue.getField().getName()+
+ ", fieldValue="+uniqueValue.getField().getValue()+
+ ", uuid="+uniqueValue.getEntityId().getUuid()+
+ ", type="+uniqueValue.getEntityId().getType()+
+ ", version="+uniqueValue.getEntityVersion();
+ stringBuilder.append("{").append(entry).append("},");
+ });
+
+ stringBuilder.deleteCharAt(stringBuilder.length() -1);
+ stringBuilder.append("]");
+
+ logger.info("Returned unique value set from serialization load = {}", stringBuilder.toString());
+
+ } else {
+
+ logger.info("Running entity unique scanner only");
+
+
+ // scan through all unique values and log some info
+
+ Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<TypeField>, EntityVersion>> rows = null;
+ try {
+
+ rows = keyspace.prepareQuery(CF_UNIQUE_VALUES)
- .setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
++ .setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
+ .getAllRows()
+ .withColumnRange(new RangeBuilder().setLimit(1000).build())
+ .execute().getResult().iterator();
+
+ } catch (ConnectionException e) {
+
+ logger.error("Error connecting to cassandra", e);
+ }
+
+
+ UUID finalAppToFilter = appToFilter;
+
+ if( rows != null) {
+ rows.forEachRemaining(row -> {
+
+ count.incrementAndGet();
+
+ if(count.get() % 1000 == 0 ){
+ logger.info("Scanned {} rows in {}", count.get(), CF_UNIQUE_VALUES.getName());
+ }
+
+ final String fieldName = row.getKey().getKey().getField().getName();
+ final String fieldValue = row.getKey().getKey().getField().getValue().toString();
+ final String scopeType = row.getKey().getScope().getType();
+ final UUID scopeUUID = row.getKey().getScope().getUuid();
+
+
+ if (!fieldName.equalsIgnoreCase(fieldType) ||
+ (finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID))
+ ) {
+ // do nothing
+
+ } else {
+
+
+ // if we have more than 1 column, let's check for a duplicate
+ if (row.getColumns() != null && row.getColumns().size() > 1) {
+
+ final List<EntityVersion> values = new ArrayList<>(row.getColumns().size());
+
+ Iterator<Column<EntityVersion>> columns = row.getColumns().iterator();
+ columns.forEachRemaining(column -> {
+
+
+ final EntityVersion entityVersion = column.getName();
+
+
+ logger.trace(
+ scopeType + ": " + scopeUUID + ", " +
+ fieldName + ": " + fieldValue + ", " +
+ "entity type: " + entityVersion.getEntityId().getType() + ", " +
+ "entity uuid: " + entityVersion.getEntityId().getUuid()
+ );
+
+
+ if (entityType != null &&
+ entityVersion.getEntityId().getType().equalsIgnoreCase(entityType)
+ ) {
+
+ // add the first value into the list
+ if (values.size() == 0) {
+
+ values.add(entityVersion);
+
+
+ } else {
+
+ if (!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())) {
+
+ values.add(entityVersion);
+
+ logger.error("Duplicate found for field [{}={}]. Entry 1: [{}], Entry 2: [{}]",
+ fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId());
+
+ }
+
+ }
+
+
+ }
+
+ });
+ }
+ }
+
+
+ });
+ }else{
+
+ logger.warn("No rows returned from table: {}", CF_UNIQUE_VALUES.getName());
+
+ }
+
+ }
+ }
+ }