You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:50 UTC

[05/10] usergrid git commit: When writing to queue, if in-memory queue is empty then refresh will be done.

When writing to queue, if in-memory queue is empty then refresh will be done.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/727ff1d6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/727ff1d6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/727ff1d6

Branch: refs/heads/usergrid-1318-queue
Commit: 727ff1d642fdaf3768ab2b1f0c07364854ec2e60
Parents: 9306f12
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 13:48:47 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 13:48:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/persistence/qakka/QakkaFig.java  |  2 +-
 .../persistence/qakka/core/impl/InMemoryQueue.java       |  4 ++++
 .../persistence/qakka/distributed/actors/QueueActor.java | 11 +++++++----
 .../qakka/distributed/actors/QueueWriter.java            |  3 +++
 .../distributed/impl/DistributedQueueServiceImpl.java    |  8 ++++++--
 .../qakka/distributed/messages/QueueRefreshRequest.java  |  8 +++++++-
 .../persistence/qakka/core/QueueMessageManagerTest.java  |  1 +
 .../qakka/distributed/actors/QueueReaderTest.java        |  2 +-
 8 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c3f4189..da47c98 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -94,7 +94,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How often to refresh each queue's in-memory data */
     @Key(QUEUE_REFRESH_MILLISECONDS)
-    @Default("500")
+    @Default("1000")
     int getQueueRefreshMilliseconds();
 
     /** How many queue messages to keep in-memory */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 474ef5c..27de079 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -78,6 +78,10 @@ public class InMemoryQueue {
         return getQueue( queueName ).poll();
     }
 
+    public DatabaseQueueMessage peek( String queueName ) {
+        return getQueue( queueName ).peek();
+    }
+
     public int size( String queueName ) {
         return getQueue( queueName ).size();
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 3b50711..315975c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -86,7 +86,7 @@ public class QueueActor extends UntypedActor {
                     Duration.create( 0, TimeUnit.MILLISECONDS),
                     Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
                     self(),
-                    new QueueRefreshRequest( request.getQueueName() ),
+                    new QueueRefreshRequest( request.getQueueName(), false ),
                     getContext().dispatcher(),
                     getSelf());
                 refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
@@ -121,9 +121,12 @@ public class QueueActor extends UntypedActor {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
 
             if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                    QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
-                queueReadersByQueueName.put( request.getQueueName(), readerRef );
+
+                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+                    ActorRef readerRef = getContext().actorOf( Props.create(
+                        QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
+                }
             }
 
             // hand-off to queue's reader

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 7166ef1..e54d916 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -25,6 +25,7 @@ import com.google.inject.Injector;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -44,6 +45,7 @@ public class QueueWriter extends UntypedActor {
 
     public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
 
+    private final DistributedQueueService   distributedQueueService;
     private final QueueMessageSerialization messageSerialization;
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
@@ -61,6 +63,7 @@ public class QueueWriter extends UntypedActor {
         auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
         metricsService           = injector.getInstance( MetricsService.class );
 
+        distributedQueueService     = injector.getInstance( DistributedQueueService.class );
         messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 3d6a808..9d71c31 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,7 +25,6 @@ import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.log4j.net.SyslogAppender;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -114,7 +113,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public void refreshQueue(String queueName) {
         logger.info("{} Requesting refresh for queue: {}", this, queueName);
-        QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+        QueueRefreshRequest request = new QueueRefreshRequest( queueName, false );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );
     }
@@ -168,6 +167,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                         if ( retries > 1 ) {
                             logger.debug("SUCCESS after {} retries", retries );
                         }
+
+                        logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+                        QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+                        clientActor.tell( qrr, null );
+
                         return qarm.getSendStatus();
 
                     } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
index a81a6fd..b65ad3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -24,10 +24,16 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 
 public class QueueRefreshRequest implements QakkaMessage {
     private final String queueName;
+    private final boolean onlyIfEmpty;
 
 
-    public QueueRefreshRequest(String queueName ) {
+    public QueueRefreshRequest( String queueName, boolean onlyIfEmpty ) {
         this.queueName = queueName;
+        this.onlyIfEmpty = onlyIfEmpty;
+    }
+
+    public boolean isOnlyIfEmpty() {
+        return onlyIfEmpty;
     }
 
     public String getQueueName() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 3225a66..c10d1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -175,6 +175,7 @@ public class QueueMessageManagerTest extends AbstractTest {
             if (inMemoryQueue.size( queueName ) == 40) {
                 break;
             }
+            Thread.sleep( 500 );
         }
 
         Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 0b8b795..19c1211 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -95,7 +95,7 @@ public class QueueReaderTest extends AbstractTest {
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
         ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
-        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName );
+        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
 
         // need to wait for refresh to complete
         int maxRetries = 10;