You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/03 14:49:40 UTC
[1/2] usergrid git commit: Fixes to get tests running consistently.
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue 5a19ba9a7 -> 4d45d1c4d
Fixes to get tests running consistently.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/781b894a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/781b894a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/781b894a
Branch: refs/heads/usergrid-1318-queue
Commit: 781b894a65851e4e8a4ff2fac84b644e5b72c12d
Parents: 5a19ba9
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 3 10:45:22 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 3 10:45:22 2016 -0400
----------------------------------------------------------------------
.../qakka/core/impl/QueueManagerImpl.java | 36 +++++++++++++-------
.../qakka/distributed/actors/QueueActor.java | 2 +-
.../queue/impl/QakkaQueueManager.java | 8 ++---
.../qakka/api/QueueResourceTest.java | 12 +++++++
.../qakka/core/QueueMessageManagerTest.java | 2 +-
.../distributed/QueueActorServiceTest.java | 1 +
.../distributed/actors/ShardAllocatorTest.java | 2 +-
.../queue/LegacyQueueManagerTest.java | 9 ++---
8 files changed, 43 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index 789edd4..a8139a1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.qakka.core.impl;
import com.google.inject.Inject;
+import com.google.inject.spi.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
@@ -27,6 +28,8 @@ import org.apache.usergrid.persistence.qakka.core.Queue;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.core.Regions;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -37,23 +40,26 @@ import java.util.List;
public class QueueManagerImpl implements QueueManager {
- private final ActorSystemFig actorSystemFig;
- private final QueueSerialization queueSerialization;
- private final DistributedQueueService distributedQueueService;
- private final ShardSerialization shardSerialization;
+ private final ActorSystemFig actorSystemFig;
+ private final QueueSerialization queueSerialization;
+ private final DistributedQueueService distributedQueueService;
+ private final ShardSerialization shardSerialization;
+ private final MessageCounterSerialization messageCounterSerialization;
@Inject
public QueueManagerImpl(
- ActorSystemFig actorSystemFig,
- QueueSerialization queueSerialization,
- DistributedQueueService distributedQueueService,
- ShardSerialization shardSerialization ) {
-
- this.actorSystemFig = actorSystemFig;
- this.queueSerialization = queueSerialization;
- this.distributedQueueService = distributedQueueService;
- this.shardSerialization = shardSerialization;
+ ActorSystemFig actorSystemFig,
+ QueueSerialization queueSerialization,
+ DistributedQueueService distributedQueueService,
+ ShardSerialization shardSerialization,
+ MessageCounterSerialization messageCounterSerialization) {
+
+ this.actorSystemFig = actorSystemFig;
+ this.queueSerialization = queueSerialization;
+ this.distributedQueueService = distributedQueueService;
+ this.shardSerialization = shardSerialization;
+ this.messageCounterSerialization = messageCounterSerialization;
}
@Override
@@ -87,6 +93,10 @@ public class QueueManagerImpl implements QueueManager {
// only write the existence of a queue to the database if its dependent initial shards have been written
queueSerialization.writeQueue(queue.toDatabaseQueue());
+ // init counters
+ messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.DEFAULT, 0L );
+ messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.INFLIGHT, 0L );
+
distributedQueueService.initQueue( queue.getName() );
distributedQueueService.refreshQueue( queue.getName() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 5ebba3d..c706f7d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -190,7 +190,7 @@ public class QueueActor extends UntypedActor {
while (queueMessages.size() < queueGetRequest.getNumRequested()) {
- DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
+ DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
if (queueMessage != null) {
if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index f3cae86..4d81a64 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -60,6 +60,8 @@ public class QakkaQueueManager implements LegacyQueueManager {
this.queueManager = queueManager;
this.queueMessageManager = queueMessageManager;
this.regions = regions;
+
+ createQueueIfNecessary();
}
@@ -79,8 +81,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
@Override
public <T extends Serializable> void sendMessage(T body) throws IOException {
- createQueueIfNecessary();
-
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(body);
@@ -107,8 +107,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
@Override
public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
- createQueueIfNecessary();
-
List<LegacyQueueMessage> messages = new ArrayList<>();
List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit );
@@ -149,8 +147,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
@Override
public void commitMessage(LegacyQueueMessage queueMessage) {
- createQueueIfNecessary();
-
UUID queueMessageId = UUID.fromString( queueMessage.getMessageId() );
queueMessageManager.ackMessage( scope.getName(), queueMessageId );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index fcb4212..c05281c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -76,6 +76,9 @@ public class QueueResourceTest extends AbstractRestTest {
Assert.assertFalse( apiResponse.getQueues().isEmpty() );
Assert.assertEquals( 1, apiResponse.getQueues().size() );
Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() );
+
+ response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
}
@@ -182,6 +185,9 @@ public class QueueResourceTest extends AbstractRestTest {
// get all messages, checking for dups
checkJsonMessages( queueName, numMessages );
+
+ Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
}
@@ -257,6 +263,9 @@ public class QueueResourceTest extends AbstractRestTest {
// get all messages, checking for dups
checkBinaryMessages( queueName, numMessages );
+
+ Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
}
@@ -373,6 +382,9 @@ public class QueueResourceTest extends AbstractRestTest {
// and, those same messages should be available again in the queue
checkJsonMessages( queueName, numMessages/2 );
+
+ response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 8ce9822..9e0a9d8 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -180,7 +180,7 @@ public class QueueMessageManagerTest extends AbstractTest {
int retries = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
- if (inMemoryQueue.size( queueName ) == 40) {
+ if (qmm.getQueueDepth( queueName ) == 40) {
break;
}
Thread.sleep( 500 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 53f9224..a5c95bd 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -166,6 +166,7 @@ public class QueueActorServiceTest extends AbstractTest {
count = 100;
break;
}
+ count = inMemoryQueue.size( queueName );
Thread.sleep( 1000 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index b602177..c6831b7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -203,10 +203,10 @@ public class ShardAllocatorTest extends AbstractTest {
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
+ Thread.sleep( 10 );
}
distributedQueueService.refresh();
- Thread.sleep( 3000 );
// Test that 8 shards were created
http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 5800bba..65c3309 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -57,8 +57,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
Injector myInjector = getInjector();
- CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
-
ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -83,7 +81,7 @@ public class LegacyQueueManagerTest extends AbstractTest {
}
messageList = qm.getMessages(1, String.class);
- assertTrue(messageList.size() <= 0);
+ assertEquals( 0, messageList.size() );
DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
distributedQueueService.shutdown();
@@ -125,20 +123,17 @@ public class LegacyQueueManagerTest extends AbstractTest {
qm.commitMessages(messageList);
messageList = qm.getMessages(1, values.getClass());
- assertTrue(messageList.size() <= 0);
+ assertEquals( 0, messageList.size());
DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
distributedQueueService.shutdown();
}
@Test
- @Ignore("Not implemented yet")
public void queueSize() throws Exception{
Injector myInjector = getInjector();
- CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
-
ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
[2/2] usergrid git commit: Fixes to get tests running consistently.
Posted by sn...@apache.org.
Fixes to get tests running consistently.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4d45d1c4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4d45d1c4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4d45d1c4
Branch: refs/heads/usergrid-1318-queue
Commit: 4d45d1c4d4cbc6e921f94b69593ee3374013c711
Parents: 781b894
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 3 10:49:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 3 10:49:24 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/api/QueueResource.java | 13 +++++--------
.../persistence/qakka/api/QueueResourceTest.java | 10 +++++-----
.../queue/src/test/resources/qakka.properties | 2 ++
3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index f82661c..10dae04 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -44,12 +44,11 @@ import java.util.UUID;
public class QueueResource {
private static final Logger logger = LoggerFactory.getLogger( QueueResource.class );
- private final QueueManager queueManager;
+ private final QueueManager queueManager;
private final QueueMessageManager queueMessageManager;
- private final MetricsService metricsService;
- private final URIStrategy uriStrategy;
- private final Regions regions;
- private final ShardCounterSerialization shardCounterSerialization;
+ private final MetricsService metricsService;
+ private final URIStrategy uriStrategy;
+ private final Regions regions;
@Inject
@@ -58,15 +57,13 @@ public class QueueResource {
QueueMessageManager queueMessageManager,
MetricsService metricsService,
URIStrategy uriStrategy,
- Regions regions,
- ShardCounterSerialization shardCounterSerialization ) {
+ Regions regions ) {
this.queueManager = queueManager;
this.queueMessageManager = queueMessageManager;
this.metricsService = metricsService;
this.uriStrategy = uriStrategy;
this.regions = regions;
- this.shardCounterSerialization = shardCounterSerialization;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index c05281c..d723e97 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -56,7 +56,7 @@ public class QueueResourceTest extends AbstractRestTest {
// create a queue
- String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ String queueName = "qrt_create_" + RandomStringUtils.randomAlphanumeric( 10 );
Map<String, Object> queueMap = new HashMap<String, Object>() {{
put("name", queueName);
}};
@@ -87,7 +87,7 @@ public class QueueResourceTest extends AbstractRestTest {
// create a queue
- String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ String queueName = "qrt_delete_" + RandomStringUtils.randomAlphanumeric( 10 );
Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
Response response = target("queues").request()
.post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
@@ -149,7 +149,7 @@ public class QueueResourceTest extends AbstractRestTest {
// create a queue
- String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ String queueName = "qrt_json_" + RandomStringUtils.randomAlphanumeric( 10 );
Map<String, Object> queueMap = new HashMap<String, Object>() {{
put( "name", queueName );
}};
@@ -238,7 +238,7 @@ public class QueueResourceTest extends AbstractRestTest {
// create a queue
- String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ String queueName = "qrt_binarty_" + RandomStringUtils.randomAlphanumeric( 10 );
Map<String, Object> queueMap = new HashMap<String, Object>() {{
put( "name", queueName );
}};
@@ -316,7 +316,7 @@ public class QueueResourceTest extends AbstractRestTest {
// create a queue
- String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ String queueName = "qrt_timeout_" + RandomStringUtils.randomAlphanumeric( 10 );
Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index fb46f3d..ef1be80 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,6 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
# Port used for cluster communications.
usergrid.cluster.port=2551
+queue.num.actors=50
+queue.sender.num.actors=100
queue.writer.num.actors=100
# set shard size and times low for testing purposes