You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:50 UTC

[21/25] usergrid git commit: Fixes to get all tests passing again.

Fixes to get all tests passing again.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/99dbfc2d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/99dbfc2d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/99dbfc2d

Branch: refs/heads/usergrid-1318-queue
Commit: 99dbfc2d17e330583560d386b6ecb5cce93fa3e5
Parents: 447b60d
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 16 09:39:51 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 16 09:39:51 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/core/CassandraFig.java | 12 ++++++++++
 .../core/datastax/impl/DataStaxClusterImpl.java |  2 +-
 .../persistence/qakka/AbstractTest.java         |  2 +-
 .../qakka/core/QueueMessageManagerTest.java     |  2 --
 .../distributed/QueueActorServiceTest.java      |  1 +
 .../distributed/actors/QueueTimeouterTest.java  |  1 -
 .../distributed/actors/ShardAllocatorTest.java  |  1 -
 ...tiShardDatabaseQueueMessageIteratorTest.java | 23 +++++++++++---------
 .../sharding/ShardIteratorTest.java             | 18 +++++++++------
 .../queue/src/test/resources/log4j.properties   |  3 +++
 .../queue/src/test/resources/qakka.properties   | 12 +++-------
 11 files changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index 90f4ae8..b599a20 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -35,9 +35,13 @@ public interface CassandraFig extends GuicyFig {
     String READ_CL = "cassandra.readcl";
     String READ_CL_CONSISTENT = "cassandra.readcl.consistent";
     String WRITE_CL = "cassandra.writecl";
+
     String STRATEGY = "cassandra.strategy";
     String STRATEGY_OPTIONS = "cassandra.strategy.options";
 
+    String STRATEGY_LOCAL = "cassandra.strategy.local";
+    String STRATEGY_OPTIONS_LOCAL = "cassandra.strategy.options.local";
+
     // main application cassandra properties
     String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
     String ASTYANAX_READ_CL = "usergrid.read.cl";
@@ -157,6 +161,14 @@ public interface CassandraFig extends GuicyFig {
     @Key( STRATEGY_OPTIONS )
     String getStrategyOptions();
 
+    @Default("SimpleStrategy")
+    @Key( STRATEGY_LOCAL )
+    String getStrategyLocal();
+
+    @Default("replication_factor:1")
+    @Key( STRATEGY_OPTIONS_LOCAL )
+    String getStrategyOptionsLocal();
+
     /**
      * Return the history of all shard values which are immutable.  For instance, if shard values
      * are initially set to 20 (the default) then increased to 40, the property should contain the string of

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index fe9803d..c8ddf3e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -148,7 +148,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
         final String createQueueMessageKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
             CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
-            CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions())
+            CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal())
 
         );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
index 6f1c744..c90db2e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
@@ -40,7 +40,7 @@ public class AbstractTest {
 
     protected static Injector sharedInjector;
 
-    AtomicBoolean migrated = new AtomicBoolean( false );
+    static AtomicBoolean migrated = new AtomicBoolean( false );
 
     static { new KeyspaceDropper(); }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 630c953..124cb86 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -134,7 +134,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
 
     @Test
-    @Ignore
     public void testQueueMessageTimeouts() throws Exception {
 
         Injector injector = getInjector();
@@ -223,7 +222,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
 
     @Test
-    @Ignore
     public void testGetWithMissingData() throws InterruptedException {
 
         Injector injector = getInjector();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index a46c186..0883650 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -156,6 +156,7 @@ public class QueueActorServiceTest extends AbstractTest {
         int count = 0;
         while ( retries++ < maxRetries ) {
             Thread.sleep( 1000 );
+            distributedQueueService.refresh();
             if (inMemoryQueue.size( queueName ) == 100) {
                 count = 100;
                 break;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index 54f9d42..e3541a4 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -54,7 +54,6 @@ public class QueueTimeouterTest extends AbstractTest {
 
 
     @Test
-    @Ignore
     public void testBasicOperation() throws Exception {
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index ae62c89..7fd664f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -163,7 +163,6 @@ public class ShardAllocatorTest extends AbstractTest {
 
 
     @Test
-    @Ignore
     public void testBasicOperationWithMessages() throws InterruptedException {
 
         Injector injector = getInjector();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
index 2d8da6d..5fa3434 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.usergrid.persistence.qakka.serialization;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
@@ -56,10 +57,12 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
         QueueMessageSerialization queueMessageSerialization =
                 getInjector().getInstance( QueueMessageSerialization.class );
 
-        Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null);
-        Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null);
-        Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null);
-        Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, null);
+        String queueName = "queue_msit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+        Shard shard1 = new Shard(queueName, "region", Shard.Type.DEFAULT, 1L, null);
+        Shard shard2 = new Shard(queueName, "region", Shard.Type.DEFAULT, 2L, null);
+        Shard shard3 = new Shard(queueName, "region", Shard.Type.DEFAULT, 3L, null);
+        Shard shard4 = new Shard(queueName, "region", Shard.Type.DEFAULT, 4L, null);
 
         shardSerialization.createShard(shard1);
         shardSerialization.createShard(shard2);
@@ -72,7 +75,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
         for(int i=0; i < numMessagesPerShard; i++){
 
             queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
-                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(),
+                    DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard1.getShardId(),
                     System.currentTimeMillis(), null, null));
             Thread.sleep(3);
         }
@@ -80,7 +83,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
         for(int i=0; i < numMessagesPerShard; i++){
 
             queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
-                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(),
+                    DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard2.getShardId(),
                     System.currentTimeMillis(), null, null));
             Thread.sleep(3);
         }
@@ -88,7 +91,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
         for(int i=0; i < numMessagesPerShard; i++){
 
             queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
-                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(),
+                    DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard3.getShardId(),
                     System.currentTimeMillis(), null, null));
             Thread.sleep(3);
         }
@@ -96,16 +99,16 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
         for(int i=0; i < numMessagesPerShard; i++){
 
             queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
-                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(),
+                    DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard4.getShardId(),
                     System.currentTimeMillis(), null, null));
             Thread.sleep(3);
         }
 
 
         ShardIterator shardIterator = new ShardIterator(
-                cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty());
+                cassandraClient, queueName, "region", Shard.Type.DEFAULT, Optional.empty());
         MultiShardMessageIterator iterator = new MultiShardMessageIterator(
-                cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+                cassandraClient, queueName, "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
 
         final AtomicInteger[] counts = {
                 new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) };

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
index fb0a46e..0d593aa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -50,14 +50,16 @@ public class ShardIteratorTest extends AbstractTest {
         CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
         ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
 
-        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
-        Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+        String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+        Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null);
+        Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null);
 
         shardSerialization.createShard(shard1);
         shardSerialization.createShard(shard2);
 
         Iterator<Shard> shardIterator = new ShardIterator(
-                cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty());
+                cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.empty());
 
         List<Shard> shards = new ArrayList<>(1);
 
@@ -81,9 +83,11 @@ public class ShardIteratorTest extends AbstractTest {
         CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
         ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
 
-        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
-        Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
-        Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null);
+        String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+        Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null);
+        Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null);
+        Shard shard3 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 300L, null);
 
         shardSerialization.createShard(shard1);
         shardSerialization.createShard(shard2);
@@ -91,7 +95,7 @@ public class ShardIteratorTest extends AbstractTest {
 
 
         Iterator<Shard> shardIterator = new ShardIterator(
-                cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L));
+                cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.of(200L));
 
         List<Shard> shards = new ArrayList<>(1);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index 3c679f5..9e14f29 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -27,3 +27,6 @@ log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
 
 log4j.logger.org.apache.cassandra=WARN
 log4j.logger.org.glassfish=WARN
+
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 9140637..dc7ef48 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -34,21 +34,15 @@ usergrid.cluster.seeds=us-east:localhost
 # Port used for cluster communications.
 usergrid.cluster.port=2551
 
-queue.sender.num.actors=20
-queue.writer.num.actors=20
-queue.num.actors=20
+queue.writer.num.actors=100
 
 # set shard size and times low for testing purposes
 queue.shard.max.size=500
-queue.shard.allocation.check.frequency.millis=1000
-queue.shard.allocation.advance.time.millis=2000
-queue.refresh.millis=1000
+queue.shard.allocation.check.frequency.millis=100
+queue.shard.allocation.advance.time.millis=200
 
 queue.max.inmemory.shard.counter = 100
 
-cassandra.connections=10
-#cassandra.timeout=20000
-
 cassandra.hosts=localhost
 
 cassandra.keyspace.application=qakka_test_application