You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:50 UTC
[05/10] usergrid git commit: When writing to queue,
if in-memory queue is empty then refresh will be done.
When writing to queue, if in-memory queue is empty then refresh will be done.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/727ff1d6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/727ff1d6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/727ff1d6
Branch: refs/heads/usergrid-1318-queue
Commit: 727ff1d642fdaf3768ab2b1f0c07364854ec2e60
Parents: 9306f12
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 13:48:47 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 13:48:47 2016 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/persistence/qakka/QakkaFig.java | 2 +-
.../persistence/qakka/core/impl/InMemoryQueue.java | 4 ++++
.../persistence/qakka/distributed/actors/QueueActor.java | 11 +++++++----
.../qakka/distributed/actors/QueueWriter.java | 3 +++
.../distributed/impl/DistributedQueueServiceImpl.java | 8 ++++++--
.../qakka/distributed/messages/QueueRefreshRequest.java | 8 +++++++-
.../persistence/qakka/core/QueueMessageManagerTest.java | 1 +
.../qakka/distributed/actors/QueueReaderTest.java | 2 +-
8 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c3f4189..da47c98 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -94,7 +94,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
/** How often to refresh each queue's in-memory data */
@Key(QUEUE_REFRESH_MILLISECONDS)
- @Default("500")
+ @Default("1000")
int getQueueRefreshMilliseconds();
/** How many queue messages to keep in-memory */
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 474ef5c..27de079 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -78,6 +78,10 @@ public class InMemoryQueue {
return getQueue( queueName ).poll();
}
+ public DatabaseQueueMessage peek( String queueName ) {
+ return getQueue( queueName ).peek();
+ }
+
public int size( String queueName ) {
return getQueue( queueName ).size();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 3b50711..315975c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -86,7 +86,7 @@ public class QueueActor extends UntypedActor {
Duration.create( 0, TimeUnit.MILLISECONDS),
Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
self(),
- new QueueRefreshRequest( request.getQueueName() ),
+ new QueueRefreshRequest( request.getQueueName(), false ),
getContext().dispatcher(),
getSelf());
refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
@@ -121,9 +121,12 @@ public class QueueActor extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest)message;
if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
- queueReadersByQueueName.put( request.getQueueName(), readerRef );
+
+ if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+ ActorRef readerRef = getContext().actorOf( Props.create(
+ QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+ queueReadersByQueueName.put( request.getQueueName(), readerRef );
+ }
}
// hand-off to queue's reader
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 7166ef1..e54d916 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -25,6 +25,7 @@ import com.google.inject.Injector;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -44,6 +45,7 @@ public class QueueWriter extends UntypedActor {
public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
+ private final DistributedQueueService distributedQueueService;
private final QueueMessageSerialization messageSerialization;
private final TransferLogSerialization transferLogSerialization;
private final AuditLogSerialization auditLogSerialization;
@@ -61,6 +63,7 @@ public class QueueWriter extends UntypedActor {
auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
metricsService = injector.getInstance( MetricsService.class );
+ distributedQueueService = injector.getInstance( DistributedQueueService.class );
messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 3d6a808..9d71c31 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,7 +25,6 @@ import akka.util.Timeout;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.log4j.net.SyslogAppender;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.ClientActor;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -114,7 +113,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public void refreshQueue(String queueName) {
logger.info("{} Requesting refresh for queue: {}", this, queueName);
- QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+ QueueRefreshRequest request = new QueueRefreshRequest( queueName, false );
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );
}
@@ -168,6 +167,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
if ( retries > 1 ) {
logger.debug("SUCCESS after {} retries", retries );
}
+
+ logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+ QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+ clientActor.tell( qrr, null );
+
return qarm.getSendStatus();
} else {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
index a81a6fd..b65ad3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -24,10 +24,16 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
public class QueueRefreshRequest implements QakkaMessage {
private final String queueName;
+ private final boolean onlyIfEmpty;
- public QueueRefreshRequest(String queueName ) {
+ public QueueRefreshRequest( String queueName, boolean onlyIfEmpty ) {
this.queueName = queueName;
+ this.onlyIfEmpty = onlyIfEmpty;
+ }
+
+ public boolean isOnlyIfEmpty() {
+ return onlyIfEmpty;
}
public String getQueueName() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 3225a66..c10d1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -175,6 +175,7 @@ public class QueueMessageManagerTest extends AbstractTest {
if (inMemoryQueue.size( queueName ) == 40) {
break;
}
+ Thread.sleep( 500 );
}
Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 0b8b795..19c1211 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -95,7 +95,7 @@ public class QueueReaderTest extends AbstractTest {
ActorSystem system = ActorSystem.create("Test-" + queueName);
ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
- QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName );
+ QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
// need to wait for refresh to complete
int maxRetries = 10;