You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/03 14:49:40 UTC

[1/2] usergrid git commit: Fixes to get tests running consistently.

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 5a19ba9a7 -> 4d45d1c4d


Fixes to get tests running consistently.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/781b894a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/781b894a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/781b894a

Branch: refs/heads/usergrid-1318-queue
Commit: 781b894a65851e4e8a4ff2fac84b644e5b72c12d
Parents: 5a19ba9
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 3 10:45:22 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 3 10:45:22 2016 -0400

----------------------------------------------------------------------
 .../qakka/core/impl/QueueManagerImpl.java       | 36 +++++++++++++-------
 .../qakka/distributed/actors/QueueActor.java    |  2 +-
 .../queue/impl/QakkaQueueManager.java           |  8 ++---
 .../qakka/api/QueueResourceTest.java            | 12 +++++++
 .../qakka/core/QueueMessageManagerTest.java     |  2 +-
 .../distributed/QueueActorServiceTest.java      |  1 +
 .../distributed/actors/ShardAllocatorTest.java  |  2 +-
 .../queue/LegacyQueueManagerTest.java           |  9 ++---
 8 files changed, 43 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index 789edd4..a8139a1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.qakka.core.impl;
 
 import com.google.inject.Inject;
+import com.google.inject.spi.Message;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
@@ -27,6 +28,8 @@ import org.apache.usergrid.persistence.qakka.core.Queue;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.core.Regions;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
 import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -37,23 +40,26 @@ import java.util.List;
 
 
 public class QueueManagerImpl implements QueueManager {
-    private final ActorSystemFig          actorSystemFig;
-    private final QueueSerialization queueSerialization;
-    private final DistributedQueueService distributedQueueService;
-    private final ShardSerialization      shardSerialization;
+    private final ActorSystemFig              actorSystemFig;
+    private final QueueSerialization          queueSerialization;
+    private final DistributedQueueService     distributedQueueService;
+    private final ShardSerialization          shardSerialization;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public QueueManagerImpl(
-            ActorSystemFig          actorSystemFig,
-            QueueSerialization      queueSerialization,
-            DistributedQueueService distributedQueueService,
-            ShardSerialization      shardSerialization ) {
-
-        this.actorSystemFig          = actorSystemFig;
-        this.queueSerialization      = queueSerialization;
-        this.distributedQueueService = distributedQueueService;
-        this.shardSerialization      = shardSerialization;
+        ActorSystemFig              actorSystemFig,
+        QueueSerialization          queueSerialization,
+        DistributedQueueService     distributedQueueService,
+        ShardSerialization          shardSerialization,
+        MessageCounterSerialization messageCounterSerialization) {
+
+        this.actorSystemFig              = actorSystemFig;
+        this.queueSerialization          = queueSerialization;
+        this.distributedQueueService     = distributedQueueService;
+        this.shardSerialization          = shardSerialization;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
     @Override
@@ -87,6 +93,10 @@ public class QueueManagerImpl implements QueueManager {
         // only write the existence of a queue to the database if its dependent initial shards have been written
         queueSerialization.writeQueue(queue.toDatabaseQueue());
 
+        // init counters
+        messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.DEFAULT, 0L );
+        messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.INFLIGHT, 0L );
+
         distributedQueueService.initQueue( queue.getName() );
         distributedQueueService.refreshQueue( queue.getName() );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 5ebba3d..c706f7d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -190,7 +190,7 @@ public class QueueActor extends UntypedActor {
 
                 while (queueMessages.size() < queueGetRequest.getNumRequested()) {
 
-                    DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
+                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
 
                     if (queueMessage != null) {
                         if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index f3cae86..4d81a64 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -60,6 +60,8 @@ public class QakkaQueueManager implements LegacyQueueManager {
         this.queueManager = queueManager;
         this.queueMessageManager = queueMessageManager;
         this.regions = regions;
+
+        createQueueIfNecessary();
     }
 
 
@@ -79,8 +81,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
     @Override
     public <T extends Serializable> void sendMessage(T body) throws IOException {
 
-        createQueueIfNecessary();
-
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         oos.writeObject(body);
@@ -107,8 +107,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
     @Override
     public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
 
-        createQueueIfNecessary();
-
         List<LegacyQueueMessage> messages = new ArrayList<>();
         List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit );
 
@@ -149,8 +147,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
     @Override
     public void commitMessage(LegacyQueueMessage queueMessage) {
 
-        createQueueIfNecessary();
-
         UUID queueMessageId  = UUID.fromString( queueMessage.getMessageId() );
         queueMessageManager.ackMessage( scope.getName(), queueMessageId );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index fcb4212..c05281c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -76,6 +76,9 @@ public class QueueResourceTest extends AbstractRestTest {
         Assert.assertFalse( apiResponse.getQueues().isEmpty() );
         Assert.assertEquals( 1, apiResponse.getQueues().size() );
         Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() );
+
+        response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -182,6 +185,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // get all messages, checking for dups
 
         checkJsonMessages( queueName, numMessages );
+
+        Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -257,6 +263,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // get all messages, checking for dups
 
         checkBinaryMessages( queueName, numMessages );
+
+        Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -373,6 +382,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // and, those same messages should be available again in the queue
 
         checkJsonMessages( queueName, numMessages/2 );
+
+        response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 8ce9822..9e0a9d8 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -180,7 +180,7 @@ public class QueueMessageManagerTest extends AbstractTest {
             int retries = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if (inMemoryQueue.size( queueName ) == 40) {
+                if (qmm.getQueueDepth( queueName ) == 40) {
                     break;
                 }
                 Thread.sleep( 500 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 53f9224..a5c95bd 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -166,6 +166,7 @@ public class QueueActorServiceTest extends AbstractTest {
                     count = 100;
                     break;
                 }
+                count = inMemoryQueue.size( queueName );
                 Thread.sleep( 1000 );
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index b602177..c6831b7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -203,10 +203,10 @@ public class ShardAllocatorTest extends AbstractTest {
                     null, // expiration
                     "application/json",
                     DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
+                Thread.sleep( 10 );
             }
 
             distributedQueueService.refresh();
-            Thread.sleep( 3000 );
 
             // Test that 8 shards were created
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 5800bba..65c3309 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -57,8 +57,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
 
         Injector myInjector = getInjector();
 
-        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -83,7 +81,7 @@ public class LegacyQueueManagerTest extends AbstractTest {
         }
 
         messageList = qm.getMessages(1, String.class);
-        assertTrue(messageList.size() <= 0);
+        assertEquals( 0, messageList.size() );
 
         DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
         distributedQueueService.shutdown();
@@ -125,20 +123,17 @@ public class LegacyQueueManagerTest extends AbstractTest {
         qm.commitMessages(messageList);
 
         messageList = qm.getMessages(1, values.getClass());
-        assertTrue(messageList.size() <= 0);
+        assertEquals( 0, messageList.size());
 
         DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
         distributedQueueService.shutdown();
     }
 
     @Test
-    @Ignore("Not implemented yet")
     public void queueSize() throws Exception{
 
         Injector myInjector = getInjector();
 
-        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 


[2/2] usergrid git commit: Fixes to get tests running consistently.

Posted by sn...@apache.org.
Fixes to get tests running consistently.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4d45d1c4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4d45d1c4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4d45d1c4

Branch: refs/heads/usergrid-1318-queue
Commit: 4d45d1c4d4cbc6e921f94b69593ee3374013c711
Parents: 781b894
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 3 10:49:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 3 10:49:24 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/api/QueueResource.java  | 13 +++++--------
 .../persistence/qakka/api/QueueResourceTest.java       | 10 +++++-----
 .../queue/src/test/resources/qakka.properties          |  2 ++
 3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index f82661c..10dae04 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -44,12 +44,11 @@ import java.util.UUID;
 public class QueueResource {
     private static final Logger logger = LoggerFactory.getLogger( QueueResource.class );
 
-    private final QueueManager queueManager;
+    private final QueueManager        queueManager;
     private final QueueMessageManager queueMessageManager;
-    private final MetricsService            metricsService;
-    private final URIStrategy               uriStrategy;
-    private final Regions regions;
-    private final ShardCounterSerialization shardCounterSerialization;
+    private final MetricsService      metricsService;
+    private final URIStrategy         uriStrategy;
+    private final Regions             regions;
 
 
     @Inject
@@ -58,15 +57,13 @@ public class QueueResource {
             QueueMessageManager       queueMessageManager,
             MetricsService            metricsService,
             URIStrategy               uriStrategy,
-            Regions                   regions,
-            ShardCounterSerialization shardCounterSerialization ) {
+            Regions                   regions ) {
 
         this.queueManager              = queueManager;
         this.queueMessageManager       = queueMessageManager;
         this.metricsService            = metricsService;
         this.uriStrategy               = uriStrategy;
         this.regions                   = regions;
-        this.shardCounterSerialization = shardCounterSerialization;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index c05281c..d723e97 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -56,7 +56,7 @@ public class QueueResourceTest extends AbstractRestTest {
 
         // create a queue
 
-        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        String queueName = "qrt_create_" + RandomStringUtils.randomAlphanumeric( 10 );
         Map<String, Object> queueMap = new HashMap<String, Object>() {{
             put("name", queueName);
         }};
@@ -87,7 +87,7 @@ public class QueueResourceTest extends AbstractRestTest {
 
         // create a queue
 
-        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        String queueName = "qrt_delete_" + RandomStringUtils.randomAlphanumeric( 10 );
         Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
         Response response = target("queues").request()
                 .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
@@ -149,7 +149,7 @@ public class QueueResourceTest extends AbstractRestTest {
 
         // create a queue
 
-        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        String queueName = "qrt_json_" + RandomStringUtils.randomAlphanumeric( 10 );
         Map<String, Object> queueMap = new HashMap<String, Object>() {{
             put( "name", queueName );
         }};
@@ -238,7 +238,7 @@ public class QueueResourceTest extends AbstractRestTest {
 
         // create a queue
 
-        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        String queueName = "qrt_binarty_" + RandomStringUtils.randomAlphanumeric( 10 );
         Map<String, Object> queueMap = new HashMap<String, Object>() {{
             put( "name", queueName );
         }};
@@ -316,7 +316,7 @@ public class QueueResourceTest extends AbstractRestTest {
 
         // create a queue
 
-        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        String queueName = "qrt_timeout_" + RandomStringUtils.randomAlphanumeric( 10 );
         Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
         target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d45d1c4/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index fb46f3d..ef1be80 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,6 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
 # Port used for cluster communications.
 usergrid.cluster.port=2551
 
+queue.num.actors=50
+queue.sender.num.actors=100
 queue.writer.num.actors=100
 
 # set shard size and times low for testing purposes