You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:50 UTC
[21/25] usergrid git commit: Fixes to get all tests passing again.
Fixes to get all tests passing again.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/99dbfc2d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/99dbfc2d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/99dbfc2d
Branch: refs/heads/usergrid-1318-queue
Commit: 99dbfc2d17e330583560d386b6ecb5cce93fa3e5
Parents: 447b60d
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 16 09:39:51 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 16 09:39:51 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/core/CassandraFig.java | 12 ++++++++++
.../core/datastax/impl/DataStaxClusterImpl.java | 2 +-
.../persistence/qakka/AbstractTest.java | 2 +-
.../qakka/core/QueueMessageManagerTest.java | 2 --
.../distributed/QueueActorServiceTest.java | 1 +
.../distributed/actors/QueueTimeouterTest.java | 1 -
.../distributed/actors/ShardAllocatorTest.java | 1 -
...tiShardDatabaseQueueMessageIteratorTest.java | 23 +++++++++++---------
.../sharding/ShardIteratorTest.java | 18 +++++++++------
.../queue/src/test/resources/log4j.properties | 3 +++
.../queue/src/test/resources/qakka.properties | 12 +++-------
11 files changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index 90f4ae8..b599a20 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -35,9 +35,13 @@ public interface CassandraFig extends GuicyFig {
String READ_CL = "cassandra.readcl";
String READ_CL_CONSISTENT = "cassandra.readcl.consistent";
String WRITE_CL = "cassandra.writecl";
+
String STRATEGY = "cassandra.strategy";
String STRATEGY_OPTIONS = "cassandra.strategy.options";
+ String STRATEGY_LOCAL = "cassandra.strategy.local";
+ String STRATEGY_OPTIONS_LOCAL = "cassandra.strategy.options.local";
+
// main application cassandra properties
String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
String ASTYANAX_READ_CL = "usergrid.read.cl";
@@ -157,6 +161,14 @@ public interface CassandraFig extends GuicyFig {
@Key( STRATEGY_OPTIONS )
String getStrategyOptions();
+ @Default("SimpleStrategy")
+ @Key( STRATEGY_LOCAL )
+ String getStrategyLocal();
+
+ @Default("replication_factor:1")
+ @Key( STRATEGY_OPTIONS_LOCAL )
+ String getStrategyOptionsLocal();
+
/**
* Return the history of all shard values which are immutable. For instance, if shard values
* are initially set to 20 (the default) then increased to 40, the property should contain the string of
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index fe9803d..c8ddf3e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -148,7 +148,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
final String createQueueMessageKeyspace = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
- CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions())
+ CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal())
);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
index 6f1c744..c90db2e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
@@ -40,7 +40,7 @@ public class AbstractTest {
protected static Injector sharedInjector;
- AtomicBoolean migrated = new AtomicBoolean( false );
+ static AtomicBoolean migrated = new AtomicBoolean( false );
static { new KeyspaceDropper(); }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 630c953..124cb86 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -134,7 +134,6 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
- @Ignore
public void testQueueMessageTimeouts() throws Exception {
Injector injector = getInjector();
@@ -223,7 +222,6 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
- @Ignore
public void testGetWithMissingData() throws InterruptedException {
Injector injector = getInjector();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index a46c186..0883650 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -156,6 +156,7 @@ public class QueueActorServiceTest extends AbstractTest {
int count = 0;
while ( retries++ < maxRetries ) {
Thread.sleep( 1000 );
+ distributedQueueService.refresh();
if (inMemoryQueue.size( queueName ) == 100) {
count = 100;
break;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index 54f9d42..e3541a4 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -54,7 +54,6 @@ public class QueueTimeouterTest extends AbstractTest {
@Test
- @Ignore
public void testBasicOperation() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index ae62c89..7fd664f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -163,7 +163,6 @@ public class ShardAllocatorTest extends AbstractTest {
@Test
- @Ignore
public void testBasicOperationWithMessages() throws InterruptedException {
Injector injector = getInjector();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
index 2d8da6d..5fa3434 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.qakka.serialization;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
@@ -56,10 +57,12 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
- Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null);
- Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null);
- Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null);
- Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, null);
+ String queueName = "queue_msit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ Shard shard1 = new Shard(queueName, "region", Shard.Type.DEFAULT, 1L, null);
+ Shard shard2 = new Shard(queueName, "region", Shard.Type.DEFAULT, 2L, null);
+ Shard shard3 = new Shard(queueName, "region", Shard.Type.DEFAULT, 3L, null);
+ Shard shard4 = new Shard(queueName, "region", Shard.Type.DEFAULT, 4L, null);
shardSerialization.createShard(shard1);
shardSerialization.createShard(shard2);
@@ -72,7 +75,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard1.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -80,7 +83,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard2.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -88,7 +91,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard3.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -96,16 +99,16 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard4.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
ShardIterator shardIterator = new ShardIterator(
- cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty());
+ cassandraClient, queueName, "region", Shard.Type.DEFAULT, Optional.empty());
MultiShardMessageIterator iterator = new MultiShardMessageIterator(
- cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+ cassandraClient, queueName, "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
final AtomicInteger[] counts = {
new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) };
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
index fb0a46e..0d593aa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -50,14 +50,16 @@ public class ShardIteratorTest extends AbstractTest {
CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
- Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
- Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+ String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null);
+ Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null);
shardSerialization.createShard(shard1);
shardSerialization.createShard(shard2);
Iterator<Shard> shardIterator = new ShardIterator(
- cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty());
+ cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.empty());
List<Shard> shards = new ArrayList<>(1);
@@ -81,9 +83,11 @@ public class ShardIteratorTest extends AbstractTest {
CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
- Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
- Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
- Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null);
+ String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null);
+ Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null);
+ Shard shard3 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 300L, null);
shardSerialization.createShard(shard1);
shardSerialization.createShard(shard2);
@@ -91,7 +95,7 @@ public class ShardIteratorTest extends AbstractTest {
Iterator<Shard> shardIterator = new ShardIterator(
- cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L));
+ cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.of(200L));
List<Shard> shards = new ArrayList<>(1);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index 3c679f5..9e14f29 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -27,3 +27,6 @@ log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
log4j.logger.org.apache.cassandra=WARN
log4j.logger.org.glassfish=WARN
+
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 9140637..dc7ef48 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -34,21 +34,15 @@ usergrid.cluster.seeds=us-east:localhost
# Port used for cluster communications.
usergrid.cluster.port=2551
-queue.sender.num.actors=20
-queue.writer.num.actors=20
-queue.num.actors=20
+queue.writer.num.actors=100
# set shard size and times low for testing purposes
queue.shard.max.size=500
-queue.shard.allocation.check.frequency.millis=1000
-queue.shard.allocation.advance.time.millis=2000
-queue.refresh.millis=1000
+queue.shard.allocation.check.frequency.millis=100
+queue.shard.allocation.advance.time.millis=200
queue.max.inmemory.shard.counter = 100
-cassandra.connections=10
-#cassandra.timeout=20000
-
cassandra.hosts=localhost
cassandra.keyspace.application=qakka_test_application