You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:46 UTC
[01/10] usergrid git commit: Implement configurable long polling for
Qakka queue gets
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue c08e8f0e7 -> 5a19ba9a7
Implement configurable long polling for Qakka queue gets
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/434e53e7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/434e53e7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/434e53e7
Branch: refs/heads/usergrid-1318-queue
Commit: 434e53e7eed46fbdc3cdeafde9464c90101fa7e3
Parents: c08e8f0
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 12:18:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 12:18:33 2016 -0400
----------------------------------------------------------------------
.../actorsystem/ActorSystemManagerImpl.java | 2 +-
.../apache/usergrid/persistence/qakka/App.java | 25 ++++++---
.../usergrid/persistence/qakka/QakkaFig.java | 13 +++++
.../impl/DistributedQueueServiceImpl.java | 23 ++++++++-
.../qakka/common/CassandraClientTest.java | 46 -----------------
.../qakka/core/CassandraClientTest.java | 46 +++++++++++++++++
.../qakka/core/QueueMessageManagerTest.java | 3 +-
.../distributed/QueueActorServiceTest.java | 8 +--
.../queue/src/test/resources/qakka.properties | 4 ++
.../services/notifications/QueueListener.java | 54 ++++++++++++--------
10 files changed, 140 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 9fb39b8..3fc191d 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -333,7 +333,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
/**
* Create cluster system for this the current region
*/
- private ActorSystem createClusterSystem( Config config ) {
+ private synchronized ActorSystem createClusterSystem( Config config ) {
// there is only 1 akka system for a Usergrid cluster
final String clusterName = getClusterName();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index 41bc6fa..abbf3da 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -22,11 +22,16 @@ package org.apache.usergrid.persistence.qakka;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.qakka.core.Queue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +40,7 @@ import org.slf4j.LoggerFactory;
/**
* Akka queueing application
*/
+@Singleton
public class App implements MetricsService {
private static final Logger logger = LoggerFactory.getLogger( App.class );
@@ -50,6 +56,7 @@ public class App implements MetricsService {
@Inject
public App(
Injector injector,
+ QakkaFig qakkaFig,
ActorSystemFig actorSystemFig,
ActorSystemManager actorSystemManager,
DistributedQueueService distributedQueueService,
@@ -59,12 +66,18 @@ public class App implements MetricsService {
this.actorSystemFig = actorSystemFig;
this.actorSystemManager = actorSystemManager;
this.distributedQueueService = distributedQueueService;
-//
-// try {
-// migrationManager.migrate();
-// } catch (MigrationException e) {
-// throw new QakkaRuntimeException( "Error running migration", e );
-// }
+
+ if ( qakkaFig.getStandalone() ) {
+
+ try {
+ migrationManager.migrate();
+ } catch (MigrationException e) {
+ throw new QakkaRuntimeException( "Error running migration", e );
+ }
+ actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
+ actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
+ actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 3b901b2..aa4e349 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -30,6 +30,8 @@ import java.io.Serializable;
@FigSingleton
public interface QakkaFig extends GuicyFig, Serializable {
+ String QUEUE_STANDALONE = "queue.standalone";
+
String QUEUE_NUM_ACTORS = "queue.num.actors";
String QUEUE_SENDER_NUM_ACTORS = "queue.sender.num.actors";
@@ -58,6 +60,13 @@ public interface QakkaFig extends GuicyFig, Serializable {
String QUEUE_SHARD_MAX_SIZE = "queue.shard.max.size";
+ String QUEUE_LONG_POLL_TIME_MILLIS = "queue.long.polling.time.millis";
+
+
+ /** True if Qakka is running standlone */
+ @Key(QUEUE_STANDALONE)
+ @Default("false")
+ boolean getStandalone();
/** Queue senders send to queue writers */
@Key(QUEUE_SENDER_NUM_ACTORS)
@@ -128,4 +137,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
@Key(QUEUE_SHARD_MAX_SIZE)
@Default("400000")
long getMaxShardSize();
+
+ @Key(QUEUE_LONG_POLL_TIME_MILLIS)
+ @Default("5000")
+ long getLongPollTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 4737347..be20cde 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.log4j.net.SyslogAppender;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.ClientActor;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@@ -107,7 +109,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public void refreshQueue(String queueName) {
- logger.info("Refreshing queue: {}", queueName);
+ logger.info("{} Requesting refresh for queue: {}", this, queueName);
QueueRefreshRequest request = new QueueRefreshRequest( queueName );
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );
@@ -186,6 +188,25 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
+ List<DatabaseQueueMessage> ret = new ArrayList<>();
+
+ long startTime = System.currentTimeMillis();
+
+ while ( ret.size() < count
+ && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
+
+ ret.addAll( getNextMessagesInternal( queueName, count ));
+
+ if ( ret.size() < count ) {
+ try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {}
+ }
+ }
+
+ return ret;
+ }
+
+
+ public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
List<String> queueNames = queueManager.getListOfQueues();
if ( !queueNames.contains( queueName ) ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
deleted file mode 100644
index e1f0c7e..0000000
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.qakka.common;
-
-import com.datastax.driver.core.Session;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.junit.Test;
-
-
-/**
- * Created by russo on 6/8/16.
- */
-public class CassandraClientTest extends AbstractTest {
-
- @Test
- public void getClient(){
-
- CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
- Session session = cassandraClient.getApplicationSession();
-
- session.getLoggedKeyspace();
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
new file mode 100644
index 0000000..416de0e
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class CassandraClientTest extends AbstractTest {
+
+ @Test
+ public void getClient(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+
+ Session session = cassandraClient.getApplicationSession();
+
+ session.getLoggedKeyspace();
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 124cb86..0413f81 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -171,8 +171,7 @@ public class QueueMessageManagerTest extends AbstractTest {
int maxRetries = 15;
int retries = 0;
while ( retries++ < maxRetries ) {
- //distributedQueueService.refresh();
- Thread.sleep( 1000 );
+ distributedQueueService.refresh();
if (inMemoryQueue.size( queueName ) == 40) {
break;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 0883650..7423424 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -65,8 +65,6 @@ public class QueueActorServiceTest extends AbstractTest {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -117,8 +115,6 @@ public class QueueActorServiceTest extends AbstractTest {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -151,16 +147,16 @@ public class QueueActorServiceTest extends AbstractTest {
queueName, region, region, messageId , null, null);
}
- int maxRetries = 15;
+ int maxRetries = 25;
int retries = 0;
int count = 0;
while ( retries++ < maxRetries ) {
- Thread.sleep( 1000 );
distributedQueueService.refresh();
if (inMemoryQueue.size( queueName ) == 100) {
count = 100;
break;
}
+ Thread.sleep(1000);
}
Assert.assertEquals( 100, count );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index dc7ef48..aacc187 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -18,6 +18,8 @@
# Properties for JUnit tests
+queue.standalone=true
+
usergrid.cluster_name=Test Cluster
usergrid.cluster.hostname=localhost
@@ -43,6 +45,8 @@ queue.shard.allocation.advance.time.millis=200
queue.max.inmemory.shard.counter = 100
+queue.long.polling.time.millis=2000
+
cassandra.hosts=localhost
cassandra.keyspace.application=qakka_test_application
http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 20fbd84..796450b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -72,12 +72,14 @@ public class QueueListener {
private int consecutiveCallsToRemoveDevices;
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
- this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
+ this.queueManagerFactory =
+ smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
this.smf = smf;
this.emf = emf;
this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
this.properties = props;
- this.applicationQueueManagerCache = smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+ this.applicationQueueManagerCache =
+ smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
}
@@ -94,12 +96,15 @@ public class QueueListener {
try {
- sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
- sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
- consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
+ sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+ sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+
+ consecutiveCallsToRemoveDevices =
+ new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
- int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
+ int maxThreads =
+ new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
futures = new ArrayList<>(maxThreads);
@@ -166,6 +171,13 @@ public class QueueListener {
while ( true ) {
+ if(sleepBetweenRuns > 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
+ }
+ try { Thread.sleep(sleepBetweenRuns); } catch (InterruptedException ignored) { }
+ }
+
Timer.Context timerContext = timer.time();
rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
.buffer(MAX_TAKE)
@@ -173,7 +185,7 @@ public class QueueListener {
try {
if (logger.isTraceEnabled()) {
- logger.trace("retrieved batch of {} messages from queue {}", messages.size(), queueName);
+ logger.trace("retrieved batch of {} messages from queue {}",messages.size(),queueName);
}
if (messages.size() > 0) {
@@ -185,12 +197,13 @@ public class QueueListener {
ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
UUID applicationId = queueMessage.getApplicationId();
- //Groups queue messages by application Id, ( they are all probably going to the same place )
+ // Groups queue messages by application Id,
+ // (they are all probably going to the same place)
if (!messageMap.containsKey(applicationId)) {
//For each app id it sends the set.
- List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>();
- applicationQueueMessages.add(message);
- messageMap.put(applicationId, applicationQueueMessages);
+ List<LegacyQueueMessage> lqms = new ArrayList<LegacyQueueMessage>();
+ lqms.add(message);
+ messageMap.put(applicationId, lqms);
} else {
messageMap.get(applicationId).add(message);
}
@@ -207,13 +220,15 @@ public class QueueListener {
.getApplicationQueueManager(
emf.getEntityManager(applicationId),
legacyQueueManager,
- new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)),
+ new JobScheduler(smf.getServiceManager(applicationId),
+ emf.getEntityManager(applicationId)),
metricsService,
properties
);
if (logger.isTraceEnabled()) {
- logger.trace("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
+ logger.trace("send batch for app {} of {} messages",
+ entry.getKey(), entry.getValue().size());
}
Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
@@ -231,20 +246,15 @@ public class QueueListener {
meter.mark(messages.size());
if (logger.isTraceEnabled()) {
- logger.trace("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now);
+ logger.trace("sent batch {} messages duration {} ms",
+ messages.size(), System.currentTimeMillis() - now);
}
- if(sleepBetweenRuns > 0) {
- if (logger.isTraceEnabled()) {
- logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
- }
- Thread.sleep(sleepBetweenRuns);
- }
if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){
- for(ApplicationQueueManager applicationQueueManager : applicationQueueManagerCache.asMap().values()){
+ for(ApplicationQueueManager aqm : applicationQueueManagerCache.asMap().values()){
try {
- applicationQueueManager.asyncCheckForInactiveDevices();
+ aqm.asyncCheckForInactiveDevices();
}catch (Exception inactiveDeviceException){
logger.error("Inactive Device Get failed",inactiveDeviceException);
}
[06/10] usergrid git commit: Use same timeout value as old queue
implementation did
Posted by sn...@apache.org.
Use same timeout value as old queue implementation did
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c08e02a9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c08e02a9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c08e02a9
Branch: refs/heads/usergrid-1318-queue
Commit: c08e02a914d98bd048f997cddd47b343ba9908c1
Parents: 727ff1d
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Sep 22 07:47:07 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Sep 22 07:47:07 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c08e02a9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index da47c98..472c241 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -89,7 +89,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
/** Time for queue messages to timeout, if not set per queue */
@Key(QUEUE_TIMEOUT_SECONDS)
- @Default("10")
+ @Default("30")
int getQueueTimeoutSeconds();
/** How often to refresh each queue's in-memory data */
[03/10] usergrid git commit: Introduce maxTtl for queue message and
message payload data, default is two weeks.
Posted by sn...@apache.org.
Introduce maxTtl for queue message and message payload data, default is two weeks.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b79fb87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b79fb87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b79fb87
Branch: refs/heads/usergrid-1318-queue
Commit: 8b79fb875a3aea88458e08429286d2119d6a3c88
Parents: 6c204b9
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 14:30:20 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 14:30:20 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/QakkaFig.java | 7 +++++++
.../impl/QueueMessageSerializationImpl.java | 22 ++++++++++++--------
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index aa4e349..c66001d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -62,6 +62,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
String QUEUE_LONG_POLL_TIME_MILLIS = "queue.long.polling.time.millis";
+ String QUEUE_MAX_TTL = "queue.max.ttl";
+
/** True if Qakka is running standlone */
@Key(QUEUE_STANDALONE)
@@ -141,4 +143,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
@Key(QUEUE_LONG_POLL_TIME_MILLIS)
@Default("5000")
long getLongPollTimeMillis();
+
+ /** Max time-to-live for queue message and payload data */
+ @Key(QUEUE_MAX_TTL)
+ @Default("1209600") // default is two weeks
+ int getMaxTtlSeconds();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d868021..02862c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,7 +19,6 @@
package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
-import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Clause;
@@ -32,6 +31,7 @@ import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -49,12 +49,13 @@ import java.util.UUID;
public class QueueMessageSerializationImpl implements QueueMessageSerialization {
-
private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
private final CassandraClient cassandraClient;
private final CassandraConfig cassandraConfig;
+ private final int maxTtl;
+
private final ActorSystemFig actorSystemFig;
private final ShardStrategy shardStrategy;
private final ShardCounterSerialization shardCounterSerialization;
@@ -109,17 +110,20 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
@Inject
public QueueMessageSerializationImpl(
- CassandraConfig cassandraConfig,
+ CassandraConfig cassandraConfig,
ActorSystemFig actorSystemFig,
ShardStrategy shardStrategy,
ShardCounterSerialization shardCounterSerialization,
- CassandraClient cassandraClient
+ CassandraClient cassandraClient,
+ QakkaFig qakkaFig
) {
this.cassandraConfig = cassandraConfig;
this.actorSystemFig = actorSystemFig;
this.shardStrategy = shardStrategy;
this.shardCounterSerialization = shardCounterSerialization;
this.cassandraClient = cassandraClient;
+
+ this.maxTtl = qakkaFig.getMaxTtlSeconds();
}
@@ -151,7 +155,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
.value( COLUMN_MESSAGE_ID, message.getMessageId())
.value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
.value( COLUMN_INFLIGHT_AT, inflightAt )
- .value( COLUMN_QUEUED_AT, queuedAt);
+ .value( COLUMN_QUEUED_AT, queuedAt)
+ .using( QueryBuilder.ttl( maxTtl ) );
cassandraClient.getQueueMessageSession().execute(insert);
@@ -244,9 +249,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
.and(shardIdClause)
.and(queueMessageIdClause);
- ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( delete );
-
- String s = "s";
+ cassandraClient.getQueueMessageSession().execute( delete );
}
@@ -275,7 +278,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
.value( COLUMN_MESSAGE_ID, messageId)
.value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
- .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
+ .value( COLUMN_CONTENT_TYPE, messageBody.getContentType())
+ .using( QueryBuilder.ttl( maxTtl ) );
cassandraClient.getApplicationSession().execute(insert);
}
[08/10] usergrid git commit: Now supports
elasticsearch.queue_impl=MULTIREGION setting instead of SNS,
also more/better DEBUG logging
Posted by sn...@apache.org.
Now supports elasticsearch.queue_impl=MULTIREGION setting instead of SNS, also more/better DEBUG logging
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2cd8ecb3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2cd8ecb3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2cd8ecb3
Branch: refs/heads/usergrid-1318-queue
Commit: 2cd8ecb30fc5f97927ec000573daba2ba16e914f
Parents: 00eb139
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 23 12:47:29 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 23 12:47:29 2016 -0400
----------------------------------------------------------------------
.../asyncevents/AsyncIndexProvider.java | 46 ++++++++++++----
.../qakka/distributed/actors/QueueActor.java | 53 ++++++++++++++++--
.../distributed/actors/QueueRefresher.java | 56 +++++++++++++++-----
.../distributed/actors/QueueTimeouter.java | 44 +++++++++++++--
.../impl/DistributedQueueServiceImpl.java | 12 +++--
.../queue/impl/QueueManagerFactoryImpl.java | 7 ++-
.../queue/src/test/resources/log4j.properties | 11 ++--
tests/performance/results.txt | 1 +
8 files changed, 188 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index aac0e66..f4a9bd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -103,16 +103,44 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final Implementations impl = Implementations.valueOf(value);
switch (impl) {
+
case LOCAL:
- AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
+ AsyncEventServiceImpl eventService =
+ new AsyncEventServiceImpl(scope -> new LocalQueueManager(),
+ indexProcessorFig,
+ indexProducer,
+ metricsFactory,
+ entityCollectionManagerFactory,
+ indexLocationStrategyFactory,
+ entityIndexFactory,
+ eventBuilder,
+ mapManagerFactory,
+ queueFig,rxTaskScheduler);
eventService.MAX_TAKE = 1000;
return eventService;
+
case SQS:
- throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
+ throw new IllegalArgumentException(
+ "Configuration value of SQS is no longer allowed. Use SNS instead with only a single region.");
+
case SNS:
- return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
+ throw new IllegalArgumentException(
+ "Configuration value of SNS is no longer allowed. Use MULTIREGION instead. ");
+
+ case MULTIREGION:
+ return new AsyncEventServiceImpl(
+ queueManagerFactory,
+ indexProcessorFig,
+ indexProducer,
+ metricsFactory,
+ entityCollectionManagerFactory,
+ indexLocationStrategyFactory,
+ entityIndexFactory,
+ eventBuilder,
+ mapManagerFactory,
+ queueFig,
+ rxTaskScheduler );
+
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
@@ -135,12 +163,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
/**
* Different implementations
*/
- public static enum Implementations { //TODO see about removing SNS and SQS and use AMZN? - michaelarusso
+ public static enum Implementations {
TEST,
LOCAL,
- SQS,
- SNS;
-
+ SQS, // deprecated
+ SNS, // deprecated
+ MULTIREGION; // built-in Akka-based queue
public String asString() {
return toString();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 315975c..87342ad 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -37,11 +37,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.text.DecimalFormat;
+import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
public class QueueActor extends UntypedActor {
@@ -62,6 +61,10 @@ public class QueueActor extends UntypedActor {
private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
+ private final AtomicLong runCount = new AtomicLong(0);
+ private final AtomicLong messageCount = new AtomicLong(0);
+ private final Set<String> queuesSeen = new HashSet<>();
+
public QueueActor() {
@@ -81,6 +84,8 @@ public class QueueActor extends UntypedActor {
if ( message instanceof QueueInitRequest) {
QueueInitRequest request = (QueueInitRequest)message;
+ queuesSeen.add( request.getQueueName() );
+
if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) {
Cancellable scheduler = getContext().system().scheduler().schedule(
Duration.create( 0, TimeUnit.MILLISECONDS),
@@ -120,6 +125,8 @@ public class QueueActor extends UntypedActor {
} else if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest)message;
+ queuesSeen.add( request.getQueueName() );
+
if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
@@ -135,6 +142,8 @@ public class QueueActor extends UntypedActor {
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;
+ queuesSeen.add( request.getQueueName() );
+
if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf( Props.create(
QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
@@ -148,6 +157,8 @@ public class QueueActor extends UntypedActor {
} else if ( message instanceof ShardCheckRequest ) {
ShardCheckRequest request = (ShardCheckRequest)message;
+ queuesSeen.add( request.getQueueName() );
+
if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf( Props.create(
ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
@@ -164,6 +175,8 @@ public class QueueActor extends UntypedActor {
try {
QueueGetRequest queueGetRequest = (QueueGetRequest) message;
+ queuesSeen.add( queueGetRequest.getQueueName() );
+
Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
while (queueMessages.size() < queueGetRequest.getNumRequested()) {
@@ -189,6 +202,36 @@ public class QueueActor extends UntypedActor {
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
+ long runs = runCount.incrementAndGet();
+ long messagesReturned = messageCount.addAndGet( queueMessages.size() );
+
+ if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+ Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
+
+ logger.debug("QueueActor get stats (queues {}):\n" +
+ " Num runs={}\n" +
+ " Messages={}\n" +
+ " Mean={}\n" +
+ " One min rate={}\n" +
+ " Five min rate={}\n" +
+ " Snapshot mean={}\n" +
+ " Snapshot min={}\n" +
+ " Snapshot max={}",
+ queuesSeen.toArray(),
+ t.getCount(),
+ messagesReturned,
+ format.format( t.getMeanRate() ),
+ format.format( t.getOneMinuteRate() ),
+ format.format( t.getFiveMinuteRate() ),
+ format.format( t.getSnapshot().getMean() / nano ),
+ format.format( (double) t.getSnapshot().getMin() / nano ),
+ format.format( (double) t.getSnapshot().getMax() / nano ) );
+ }
+
+
} finally {
timer.close();
}
@@ -201,6 +244,8 @@ public class QueueActor extends UntypedActor {
QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+ queuesSeen.add( queueAckRequest.getQueueName() );
+
DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
queueAckRequest.getQueueName(),
queueAckRequest.getQueueMessageId() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index ebc328f..dbd5235 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -39,28 +39,31 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.DecimalFormat;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
public class QueueRefresher extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
- private final String queueName;
-
- private final QueueMessageSerialization serialization;
- private final InMemoryQueue inMemoryQueue;
- private final QakkaFig qakkaFig;
- private final ActorSystemFig actorSystemFig;
- private final MetricsService metricsService;
+ private final String queueName;
+ private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final ActorSystemFig actorSystemFig;
+ private final MetricsService metricsService;
private final CassandraClient cassandraClient;
+ private final AtomicLong runCount = new AtomicLong(0);
+ private final AtomicLong totalRead = new AtomicLong(0);
+
+
public QueueRefresher(String queueName ) {
this.queueName = queueName;
Injector injector = App.INJECTOR;
- serialization = injector.getInstance( QueueMessageSerialization.class );
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
qakkaFig = injector.getInstance( QakkaFig.class );
actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -76,7 +79,7 @@ public class QueueRefresher extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest) message;
- logger.debug( "running for queue {}", queueName );
+ //logger.debug( "running for queue {}", queueName );
if (!request.getQueueName().equals( queueName )) {
throw new QakkaRuntimeException(
@@ -109,10 +112,39 @@ public class QueueRefresher extends UntypedActor {
count++;
}
- if ( count > 0 ) {
- logger.debug( "Added {} in-memory for queue {}, new size = {}",
- count, queueName, inMemoryQueue.size( queueName ) );
+ long runs = runCount.incrementAndGet();
+ long readCount = totalRead.addAndGet( count );
+
+ if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+ Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
+
+ logger.debug("QueueRefresher for queue '{}' stats:\n" +
+ " Num runs={}\n" +
+ " Read count={}\n" +
+ " Mean={}\n" +
+ " One min rate={}\n" +
+ " Five min rate={}\n" +
+ " Snapshot mean={}\n" +
+ " Snapshot min={}\n" +
+ " Snapshot max={}",
+ queueName,
+ t.getCount(),
+ readCount,
+ format.format( t.getMeanRate() ),
+ format.format( t.getOneMinuteRate() ),
+ format.format( t.getFiveMinuteRate() ),
+ format.format( t.getSnapshot().getMean() / nano ),
+ format.format( (double) t.getSnapshot().getMin() / nano ),
+ format.format( (double) t.getSnapshot().getMax() / nano ) );
}
+
+// if ( count > 0 ) {
+// logger.debug( "Added {} in-memory for queue {}, new size = {}",
+// count, queueName, inMemoryQueue.size( queueName ) );
+// }
}
} finally {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 7806d30..b47aac6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -40,8 +40,10 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.DecimalFormat;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
public class QueueTimeouter extends UntypedActor {
@@ -54,9 +56,11 @@ public class QueueTimeouter extends UntypedActor {
private final ActorSystemFig actorSystemFig;
private final QakkaFig qakkaFig;
private final CassandraClient cassandraClient;
-
private final MessageCounterSerialization messageCounterSerialization;
+ private final AtomicLong runCount = new AtomicLong(0);
+ private final AtomicLong totalTimedout = new AtomicLong(0);
+
public QueueTimeouter(String queueName ) {
this.queueName = queueName;
@@ -137,13 +141,43 @@ public class QueueTimeouter extends UntypedActor {
}
}
- if (count > 0) {
- logger.debug( "Timed out {} messages for queue {}", count, queueName );
- messageCounterSerialization.decrementCounter(
- queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+ long runs = runCount.incrementAndGet();
+ long timeoutCount = totalTimedout.addAndGet( count );
+
+ if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+ Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );
+
+ logger.debug("QueueTimeouter for queue '{}' stats:\n" +
+ " Num runs={}\n" +
+ " Timeout count={}\n" +
+ " Mean={}\n" +
+ " One min rate={}\n" +
+ " Five min rate={}\n" +
+ " Snapshot mean={}\n" +
+ " Snapshot min={}\n" +
+ " Snapshot max={}",
+ queueName,
+ t.getCount(),
+ timeoutCount,
+ format.format( t.getMeanRate() ),
+ format.format( t.getOneMinuteRate() ),
+ format.format( t.getFiveMinuteRate() ),
+ format.format( t.getSnapshot().getMean() / nano ),
+ format.format( (double) t.getSnapshot().getMin() / nano ),
+ format.format( (double) t.getSnapshot().getMax() / nano ) );
}
+// if (count > 0) {
+// logger.debug( "Timed out {} messages for queue {}", count, queueName );
+//
+// messageCounterSerialization.decrementCounter(
+// queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+// }
+
} finally {
timer.close();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 9d71c31..bcb6b79 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -168,7 +165,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
logger.debug("SUCCESS after {} retries", retries );
}
- logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+ // send refresh-queue-if-empty message
QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
clientActor.tell( qrr, null );
@@ -221,6 +218,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
}
+ if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
+ logger.error("Akka Actor System is not ready yet for requests.");
+ return Collections.EMPTY_LIST;
+ }
+
int maxRetries = qakkaFig.getMaxGetRetries();
int retries = 0;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index e1bf239..4a41dda 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -53,15 +53,18 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
if ( queueFig.overrideQueueForDefault() ){
LegacyQueueManager manager = defaultManager.get( scope.getName() );
- logger.info("Using Queue Implemention: {}", manager.getClass().getSimpleName());
if ( manager == null ) {
+ logger.info("Using LocalQueueManager for scope {}", scope.getName() );
manager = new LocalQueueManager();
defaultManager.put( scope.getName(), manager );
}
return manager;
} else {
- return queuemanagerInternalFactory.getQueueManager(scope);
+ LegacyQueueManager queueManager = queuemanagerInternalFactory.getQueueManager( scope );
+ logger.info("Using queue manager {} for scope {}",
+ queueManager.getClass().getSimpleName(), scope.getName() );
+ return queueManager;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index 9e14f29..b207ea3 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -21,12 +21,13 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
-log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-
log4j.logger.org.apache.cassandra=WARN
log4j.logger.org.glassfish=WARN
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
+log4j.logger.org.apache.usergrid.corepersistence.asyncevents=DEBUG
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/tests/performance/results.txt
----------------------------------------------------------------------
diff --git a/tests/performance/results.txt b/tests/performance/results.txt
new file mode 100644
index 0000000..fb63637
--- /dev/null
+++ b/tests/performance/results.txt
@@ -0,0 +1 @@
+collection,name,uuid,modified,status,error,lastStatus
[02/10] usergrid git commit: Ensure local keyspace has unique name
per datacenter / region.
Posted by sn...@apache.org.
Ensure local keyspace has unique name per datacenter / region.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6c204b9f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6c204b9f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6c204b9f
Branch: refs/heads/usergrid-1318-queue
Commit: 6c204b9f046fa9ea234d7e608b940ead60fb6720
Parents: 434e53e
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 13:27:17 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 13:27:17 2016 -0400
----------------------------------------------------------------------
.../persistence/core/CassandraConfigImpl.java | 7 +++++--
.../usergrid/persistence/core/CassandraFig.java | 1 +
.../auditlog/impl/AuditLogSerializationImpl.java | 10 +++++-----
.../impl/MessageCounterSerializationImpl.java | 10 +++++-----
.../impl/QueueMessageSerializationImpl.java | 14 +++++++-------
.../queues/impl/QueueSerializationImpl.java | 10 +++++-----
.../impl/ShardCounterSerializationImpl.java | 10 +++++-----
.../sharding/impl/ShardSerializationImpl.java | 12 ++++++------
.../impl/TransferLogSerializationImpl.java | 10 +++++-----
.../persistence/queue/impl/QakkaQueueManager.java | 6 ------
.../queue/impl/SNSQueueManagerImpl.java | 10 +++++-----
...ultiShardDatabaseQueueMessageIteratorTest.java | 6 +++---
.../serialization/sharding/ShardIteratorTest.java | 14 +++++++-------
.../sharding/ShardSerializationTest.java | 18 +++++++++---------
14 files changed, 68 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 729f5b2..77f7228 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -64,13 +64,16 @@ public class CassandraConfigImpl implements CassandraConfig {
this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl());
- this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadClConsistent());
+ this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf(
+ cassandraFig.getReadClConsistent());
this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() );
this.applicationKeyspace = cassandraFig.getApplicationKeyspace();
- this.applicationLocalKeyspace = cassandraFig.getApplicationLocalKeyspace();
+ this.applicationLocalKeyspace =
+ cassandraFig.getApplicationLocalKeyspace() + "_"
+ + cassandraFig.getLocalDataCenter().replace("-", "_");
//add the listeners to update the values
cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index bc8d087..1faf1e7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -91,6 +91,7 @@ public interface CassandraFig extends GuicyFig {
@Default( "Usergrid_Applications" )
String getApplicationKeyspace();
+ /** Prefix for local keyspace name. Name will be this prefix plus "_" plus local data center name. */
@Key( "cassandra.keyspace.application.local" )
@Default( "Usergrid_Applications_Local" )
String getApplicationLocalKeyspace();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
index ddbd345..93dfe4b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -47,7 +47,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
public final static String TABLE_AUDIT_LOG = "audit_log";
@@ -77,8 +77,8 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
@Inject
- public AuditLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public AuditLogSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.cassandraClient = cassandraClient;
}
@@ -147,6 +147,6 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 5206ec7..f198d05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -51,7 +51,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
final static String TABLE_SHARD_COUNTERS = "counters";
final static String COLUMN_QUEUE_NAME = "queue_name";
@@ -95,8 +95,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Inject
- public MessageCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
this.cassandraClient = cassandraClient;
}
@@ -202,7 +202,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 99ff783..d868021 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -28,7 +28,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -53,7 +53,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
private final ActorSystemFig actorSystemFig;
private final ShardStrategy shardStrategy;
@@ -109,13 +109,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
@Inject
public QueueMessageSerializationImpl(
- CassandraFig cassandraFig,
+ CassandraConfig cassandraConfig,
ActorSystemFig actorSystemFig,
ShardStrategy shardStrategy,
ShardCounterSerialization shardCounterSerialization,
CassandraClient cassandraClient
) {
- this.cassandraFig = cassandraFig;
+ this.cassandraConfig = cassandraConfig;
this.actorSystemFig = actorSystemFig;
this.shardStrategy = shardStrategy;
this.shardCounterSerialization = shardCounterSerialization;
@@ -316,13 +316,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
public Collection<TableDefinition> getTables() {
return Lists.newArrayList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ),
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ),
- new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(),
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(),
TABLE_MESSAGE_DATA, MESSAGE_DATA )
);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index 07a201c..17a48c6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Clause;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -47,7 +47,7 @@ public class QueueSerializationImpl implements QueueSerialization {
private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
public final static String COLUMN_QUEUE_NAME = "queue_name";
public final static String COLUMN_REGIONS = "regions";
@@ -74,8 +74,8 @@ public class QueueSerializationImpl implements QueueSerialization {
@Inject
- public QueueSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public QueueSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.cassandraClient = cassandraClient;
}
@@ -155,7 +155,7 @@ public class QueueSerializationImpl implements QueueSerialization {
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), "queues", CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), "queues", CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index f14d234..bcfb74d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -51,7 +51,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
final static String TABLE_COUNTERS = "shard_counters";
final static String COLUMN_QUEUE_NAME = "queue_name";
@@ -91,8 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
@Inject
- public ShardCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
this.cassandraClient = cassandraClient;
}
@@ -197,6 +197,6 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), "shard_counters", CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), "shard_counters", CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
index 989622b..cc5caab 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.querybuilder.Clause;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -46,7 +46,7 @@ public class ShardSerializationImpl implements ShardSerialization {
private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
public final static String COLUMN_QUEUE_NAME = "queue_name";
public final static String COLUMN_REGION = "region";
@@ -82,8 +82,8 @@ public class ShardSerializationImpl implements ShardSerialization {
@Inject
- public ShardSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public ShardSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.cassandraClient = cassandraClient;
}
@@ -195,9 +195,9 @@ public class ShardSerializationImpl implements ShardSerialization {
@Override
public Collection<TableDefinition> getTables() {
return Lists.newArrayList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ),
- new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
index 9ebb841..51a168e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
@@ -25,7 +25,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -45,7 +45,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class );
private final CassandraClient cassandraClient;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
public final static String TABLE_TRANSFER_LOG = "transfer_log";
@@ -67,8 +67,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
@Inject
- public TransferLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
- this.cassandraFig = cassandraFig;
+ public TransferLogSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) {
+ this.cassandraConfig = cassandraConfig;
this.cassandraClient = cassandraClient;
}
@@ -164,7 +164,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 832cecd..0eb609d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -42,27 +42,21 @@ public class QakkaQueueManager implements LegacyQueueManager {
private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class );
private final LegacyQueueScope scope;
- private final LegacyQueueFig fig;
private final QueueManager queueManager;
private final QueueMessageManager queueMessageManager;
- private final QakkaFig qakkaFig;
private final Regions regions;
@Inject
public QakkaQueueManager(
@Assisted LegacyQueueScope scope,
- LegacyQueueFig fig,
QueueManager queueManager,
QueueMessageManager queueMessageManager,
- QakkaFig qakkaFig,
Regions regions
) {
this.scope = scope;
- this.fig = fig;
this.queueManager = queueManager;
- this.qakkaFig = qakkaFig;
this.queueMessageManager = queueMessageManager;
this.regions = regions;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6d4e0c4..637f157 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -32,7 +32,7 @@ import com.amazonaws.ClientConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.queue.LegacyQueue;
@@ -87,7 +87,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
private final LegacyQueueScope scope;
private final LegacyQueueFig fig;
private final ClusterFig clusterFig;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
private final ClientConfiguration clientConfiguration;
private final AmazonSQSClient sqs;
private final AmazonSNSClient sns;
@@ -154,11 +154,11 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
@Inject
public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig,
- CassandraFig cassandraFig, LegacyQueueFig queueFig ) {
+ CassandraConfig cassandraConfig, LegacyQueueFig queueFig ) {
this.scope = scope;
this.fig = fig;
this.clusterFig = clusterFig;
- this.cassandraFig = cassandraFig;
+ this.cassandraConfig = cassandraConfig;
// create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
@@ -382,7 +382,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
private String getName() {
String name =
- clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+ clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+ scope.getRegionImplementation();
name = name.toLowerCase(); //user lower case values
Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
index 5fa3434..053fdd1 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.persistence.qakka.serialization;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -51,8 +51,8 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
public void testIterator() throws InterruptedException {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
index 0d593aa..0c305fa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
@@ -47,8 +47,8 @@ public class ShardIteratorTest extends AbstractTest {
public void getActiveShards(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
@@ -80,8 +80,8 @@ public class ShardIteratorTest extends AbstractTest {
public void seekActiveShards(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
@@ -114,8 +114,8 @@ public class ShardIteratorTest extends AbstractTest {
public void shardIteratorOrdering() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
int numShards = 10;
String region = "default";
http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
index e67db28..debfdd3 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
@@ -19,7 +19,7 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -47,8 +47,8 @@ public class ShardSerializationTest extends AbstractTest {
public void writeNewShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
shardSerialization.createShard(shard1);
@@ -58,8 +58,8 @@ public class ShardSerializationTest extends AbstractTest {
public void deleteShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
@@ -75,8 +75,8 @@ public class ShardSerializationTest extends AbstractTest {
public void loadNullShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
@@ -90,8 +90,8 @@ public class ShardSerializationTest extends AbstractTest {
public void updatePointer(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+ CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
shardSerialization.createShard(shard1);
[04/10] usergrid git commit: Complete message counter and add support
for getQueueDepth() in QakkaQueueManager
Posted by sn...@apache.org.
Complete message counter and add support for getQueueDepth() in QakkaQueueManager
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9306f12e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9306f12e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9306f12e
Branch: refs/heads/usergrid-1318-queue
Commit: 9306f12eea8cd8f0ad0b2ec4751cd8a9b9ba5382
Parents: 8b79fb8
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 08:34:28 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 08:34:28 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/QakkaFig.java | 9 +-
.../usergrid/persistence/qakka/QakkaModule.java | 29 ++--
.../qakka/core/QueueMessageManager.java | 4 +
.../core/impl/QueueMessageManagerImpl.java | 13 +-
.../qakka/distributed/actors/QueueActor.java | 14 +-
.../distributed/actors/QueueTimeouter.java | 8 +
.../qakka/distributed/actors/QueueWriter.java | 138 ++++++++--------
.../impl/DistributedQueueServiceImpl.java | 6 +-
.../MessageCounterSerialization.java | 4 +-
.../impl/MessageCounterSerializationImpl.java | 159 +++++++++++++------
.../queue/impl/QakkaQueueManager.java | 4 +-
.../qakka/core/QueueMessageManagerTest.java | 2 +
.../impl/MessageCounterSerializationTest.java | 90 +++++++++++
.../sharding/ShardCounterSerializationTest.java | 3 -
.../queue/src/test/resources/qakka.properties | 1 +
15 files changed, 347 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c66001d..c3f4189 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,7 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds";
- String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.shard.counter";
+ String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter";
+
+ String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes";
String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
@@ -125,6 +127,11 @@ public interface QakkaFig extends GuicyFig, Serializable {
@Default("100")
long getMaxInMemoryShardCounter();
+ /** Once counter reaches this value, write it to permanent storage */
+ @Key(QUEUE_MAX_MESSAGE_CHANGES)
+ @Default("100")
+ long getMaxInMemoryMessageCounter();
+
/** How often to check whether new shard is needed for each queue */
@Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
@Default("5000")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index d1d8d7e..e3113e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -37,7 +37,9 @@ import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterP
import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.MessageCounterSerializationImpl;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl;
@@ -76,23 +78,24 @@ public class QakkaModule extends AbstractModule {
bind( App.class );
- bind( CassandraClient.class ).to( CassandraClientImpl.class );
- bind( MetricsService.class ).to( App.class );
+ bind( CassandraClient.class ).to( CassandraClientImpl.class );
+ bind( MetricsService.class ).to( App.class );
- bind( QueueManager.class ).to( QueueManagerImpl.class );
- bind( QueueSerialization.class ).to( QueueSerializationImpl.class );
+ bind( QueueManager.class ).to( QueueManagerImpl.class );
+ bind( QueueSerialization.class ).to( QueueSerializationImpl.class );
- bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class );
- bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
+ bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class );
+ bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
- bind( ShardSerialization.class ).to( ShardSerializationImpl.class );
- bind( ShardStrategy.class ).to( ShardStrategyImpl.class );
+ bind( ShardSerialization.class ).to( ShardSerializationImpl.class );
+ bind( ShardStrategy.class ).to( ShardStrategyImpl.class );
- bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+ bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+ bind( MessageCounterSerialization.class ).to( MessageCounterSerializationImpl.class );
- bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class );
- bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class );
- bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class );
+ bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class );
+ bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class );
+ bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class );
bind( QueueActorRouterProducer.class );
bind( QueueWriterRouterProducer.class );
@@ -110,6 +113,6 @@ public class QakkaModule extends AbstractModule {
migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) );
migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) );
- //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
+ migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
index 15203d8..b540fce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -19,6 +19,8 @@
package org.apache.usergrid.persistence.qakka.core;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
@@ -80,4 +82,6 @@ public interface QueueMessageManager {
* Get message from messages available or messages inflight storage.
*/
QueueMessage getMessage(String queueName, UUID queueMessageId);
+
+ long getQueueDepth(String queueName);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index bcd0f58..691c1a6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
private final DistributedQueueService distributedQueueService;
private final TransferLogSerialization transferLogSerialization;
private final URIStrategy uriStrategy;
+ private final MessageCounterSerialization messageCounterSerialization;
@Inject
@@ -67,8 +69,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
QueueMessageSerialization queueMessageSerialization,
DistributedQueueService distributedQueueService,
TransferLogSerialization transferLogSerialization,
- URIStrategy uriStrategy
- ) {
+ URIStrategy uriStrategy,
+ MessageCounterSerialization messageCounterSerialization ) {
this.actorSystemFig = actorSystemFig;
this.queueManager = queueManager;
@@ -76,6 +78,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
this.distributedQueueService = distributedQueueService;
this.transferLogSerialization = transferLogSerialization;
this.uriStrategy = uriStrategy;
+ this.messageCounterSerialization = messageCounterSerialization;
}
@@ -296,4 +299,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
return queueMessage;
}
+
+ @Override
+ public long getQueueDepth(String queueName) {
+ return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 6fed13b..3b50711 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
@@ -46,11 +47,13 @@ import java.util.concurrent.TimeUnit;
public class QueueActor extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
- private final QakkaFig qakkaFig;
- private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final InMemoryQueue inMemoryQueue;
private final QueueActorHelper queueActorHelper;
private final MetricsService metricsService;
+ private final MessageCounterSerialization messageCounterSerialization;
+
private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
@@ -68,6 +71,8 @@ public class QueueActor extends UntypedActor {
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
queueActorHelper = injector.getInstance( QueueActorHelper.class );
metricsService = injector.getInstance( MetricsService.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@Override
@@ -173,6 +178,11 @@ public class QueueActor extends UntypedActor {
}
}
+ messageCounterSerialization.decrementCounter(
+ queueGetRequest.getQueueName(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueMessages.size());
+
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index fcd2161..7806d30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRe
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
@@ -54,6 +55,8 @@ public class QueueTimeouter extends UntypedActor {
private final QakkaFig qakkaFig;
private final CassandraClient cassandraClient;
+ private final MessageCounterSerialization messageCounterSerialization;
+
public QueueTimeouter(String queueName ) {
this.queueName = queueName;
@@ -65,6 +68,8 @@ public class QueueTimeouter extends UntypedActor {
qakkaFig = injector.getInstance( QakkaFig.class );
metricsService = injector.getInstance( MetricsService.class );
cassandraClient = injector.getInstance( CassandraClientImpl.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@@ -134,6 +139,9 @@ public class QueueTimeouter extends UntypedActor {
if (count > 0) {
logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+ messageCounterSerialization.decrementCounter(
+ queueName, DatabaseQueueMessage.Type.DEFAULT, count);
}
} finally {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 8657370..7166ef1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResp
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.slf4j.Logger;
@@ -48,6 +49,8 @@ public class QueueWriter extends UntypedActor {
private final AuditLogSerialization auditLogSerialization;
private final MetricsService metricsService;
+ private final MessageCounterSerialization messageCounterSerialization;
+
public QueueWriter() {
@@ -57,96 +60,101 @@ public class QueueWriter extends UntypedActor {
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
metricsService = injector.getInstance( MetricsService.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@Override
public void onReceive(Object message) {
- if (message instanceof QueueWriteRequest) {
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+ if (message instanceof QueueWriteRequest) {
- try {
- QueueWriteRequest qa = (QueueWriteRequest) message;
-
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
- // TODO: implement deliveryTime and expirationTime
+ try {
+ QueueWriteRequest qa = (QueueWriteRequest) message;
- DatabaseQueueMessage dbqm = null;
- long currentTime = System.currentTimeMillis();
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
- try {
- dbqm = new DatabaseQueueMessage(
- qa.getMessageId(),
- DatabaseQueueMessage.Type.DEFAULT,
- qa.getQueueName(),
- qa.getDestRegion(),
- null,
- currentTime,
- currentTime,
- queueMessageId );
+ // TODO: implement deliveryTime and expirationTime
- messageSerialization.writeMessage( dbqm );
+ DatabaseQueueMessage dbqm = null;
+ long currentTime = System.currentTimeMillis();
- //logger.debug("Wrote queue message id {} to queue name {}",
- // dbqm.getQueueMessageId(), dbqm.getQueueName());
+ try {
+ dbqm = new DatabaseQueueMessage(
+ qa.getMessageId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ null,
+ currentTime,
+ currentTime,
+ queueMessageId );
- } catch (Throwable t) {
- logger.debug("Error creating database queue message", t);
+ messageSerialization.writeMessage( dbqm );
- auditLogSerialization.recordAuditLog(
- AuditLog.Action.SEND,
- AuditLog.Status.ERROR,
- qa.getQueueName(),
- qa.getDestRegion(),
- qa.getMessageId(),
- dbqm.getMessageId() );
+ messageCounterSerialization.incrementCounter(
+ qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.ERROR ), getSender() );
+ //logger.debug("Wrote queue message id {} to queue name {}",
+ // dbqm.getQueueMessageId(), dbqm.getQueueName());
- return;
- }
+ } catch (Throwable t) {
+ logger.debug("Error creating database queue message", t);
auditLogSerialization.recordAuditLog(
AuditLog.Action.SEND,
- AuditLog.Status.SUCCESS,
+ AuditLog.Status.ERROR,
qa.getQueueName(),
qa.getDestRegion(),
qa.getMessageId(),
- dbqm.getQueueMessageId() );
-
- try {
- transferLogSerialization.removeTransferLog(
- qa.getQueueName(),
- qa.getSourceRegion(),
- qa.getDestRegion(),
- qa.getMessageId() );
-
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
-
- } catch (Throwable e) {
- logger.debug( "Unable to delete transfer log for {} {} {} {}",
- qa.getQueueName(),
- qa.getSourceRegion(),
- qa.getDestRegion(),
- qa.getMessageId() );
- logger.debug("Error deleting transferlog", e);
-
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
- }
-
- } finally {
- timer.close();
+ dbqm.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.ERROR ), getSender() );
+
+ return;
}
- } else {
- unhandled( message );
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.SEND,
+ AuditLog.Status.SUCCESS,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ qa.getMessageId(),
+ dbqm.getQueueMessageId() );
+
+ try {
+ transferLogSerialization.removeTransferLog(
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+ } catch (Throwable e) {
+ logger.debug( "Unable to delete transfer log for {} {} {} {}",
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+ logger.debug("Error deleting transferlog", e);
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+ }
+
+ } finally {
+ timer.close();
}
+ } else {
+ unhandled( message );
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index be20cde..3d6a808 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -54,17 +55,20 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
private final ActorSystemManager actorSystemManager;
private final QueueManager queueManager;
private final QakkaFig qakkaFig;
+ private final MessageCounterSerialization messageCounterSerialization;
@Inject
public DistributedQueueServiceImpl(
ActorSystemManager actorSystemManager,
QueueManager queueManager,
- QakkaFig qakkaFig ) {
+ QakkaFig qakkaFig,
+ MessageCounterSerialization messageCounterSerialization ) {
this.actorSystemManager = actorSystemManager;
this.queueManager = queueManager;
this.qakkaFig = qakkaFig;
+ this.messageCounterSerialization = messageCounterSerialization;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
index cbbf11f..6c81863 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
-public interface MessageCounterSerialization extends Migration {
+public interface MessageCounterSerialization extends Migration {
void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
- void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+ void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement);
long getCounterValue(String name, DatabaseQueueMessage.Type type);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index f198d05..0fdb47e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,49 +43,56 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Singleton
-public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+public class MessageCounterSerializationImpl implements MessageCounterSerialization {
private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
private final CassandraClient cassandraClient;
private final CassandraConfig cassandraConfig;
- final static String TABLE_SHARD_COUNTERS = "counters";
- final static String COLUMN_QUEUE_NAME = "queue_name";
- final static String COLUMN_SHARD_ID = "shard_id";
- final static String COLUMN_COUNTER_VALUE = "counter_value";
- final static String COLUMN_SHARD_TYPE = "shard_type";
+ final static String TABLE_MESSAGE_COUNTERS = "message_counters";
+ final static String COLUMN_QUEUE_NAME = "queue_name";
+ final static String COLUMN_COUNTER_VALUE = "counter_value";
+ final static String COLUMN_MESSAGE_TYPE = "message_type";
// design note: counters based on DataStax example here:
// https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
static final String CQL =
- "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+ "CREATE TABLE IF NOT EXISTS message_counters ( " +
"counter_value counter, " +
"queue_name varchar, " +
- "shard_type varchar, " +
- "shard_id bigint, " +
- "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+ "message_type varchar, " +
+ "PRIMARY KEY (queue_name, message_type) " +
");";
- final long maxInMemoryIncrement;
+ /** number of changes since last save to database */
+ final AtomicInteger numChanges = new AtomicInteger( 0 );
+
+ final long maxChangesBeforeSave;
class InMemoryCount {
long baseCount;
final AtomicLong increment = new AtomicLong( 0L );
+ final AtomicLong decrement = new AtomicLong( 0L );
+
InMemoryCount( long baseCount ) {
this.baseCount = baseCount;
}
- public long value() {
- return baseCount + increment.get();
- }
public AtomicLong getIncrement() {
return increment;
}
+ public AtomicLong getDecrement() {
+ return decrement;
+ }
+ public long value() {
+ return baseCount + increment.get() - decrement.get();
+ }
void setBaseCount( long baseCount ) {
this.baseCount = baseCount;
}
@@ -95,64 +102,89 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Inject
- public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ public MessageCounterSerializationImpl(
+ CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+
this.cassandraConfig = cassandraConfig;
- this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+ this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
this.cassandraClient = cassandraClient;
}
+ private String buildKey( String queueName, DatabaseQueueMessage.Type type ) {
+ return queueName + "_" + type;
+ }
+
+
@Override
- public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+ public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+ String key = buildKey( queueName, type );
- String key = queueName + type + shardId;
synchronized ( inMemoryCounters ) {
if ( inMemoryCounters.get( key ) == null ) {
- Long value = retrieveCounterFromStorage( queueName, type, shardId );
+ Long value = retrieveCounterFromStorage( queueName, type );
if ( value == null ) {
- incrementCounterInStorage( queueName, type, shardId, 0L );
+ incrementCounterInStorage( queueName, type, 0L );
inMemoryCounters.put( key, new InMemoryCount( 0L ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
- inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
- return;
}
}
InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+ inMemoryCount.getIncrement().addAndGet( increment );
- synchronized ( inMemoryCount ) {
- long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+ saveIfNeeded( queueName, type );
+ }
- if (totalIncrement > maxInMemoryIncrement) {
- incrementCounterInStorage( queueName, type, shardId, totalIncrement );
- inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
- inMemoryCount.getIncrement().set( 0L );
+
+ @Override
+ public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement) {
+
+ String key = buildKey( queueName, type );
+
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type );
+
+ if ( value == null ) {
+ decrementCounterInStorage( queueName, type, 0L );
+ inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
}
}
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+ inMemoryCount.getDecrement().addAndGet( decrement );
+
+ saveIfNeeded( queueName, type );
}
@Override
- public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+ public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) {
- String key = queueName + type + shardId;
+ String key = buildKey( queueName, type );
synchronized ( inMemoryCounters ) {
if ( inMemoryCounters.get( key ) == null ) {
- Long value = retrieveCounterFromStorage( queueName, type, shardId );
+ Long value = retrieveCounterFromStorage( queueName, type );
if ( value == null ) {
throw new NotFoundException(
- MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
- queueName, type, shardId ));
+ MessageFormat.format( "No counter found for queue {0} type {1}",
+ queueName, type ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
@@ -162,30 +194,39 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
return inMemoryCounters.get( key ).value();
}
- void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
- Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+ void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+ Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) )
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) )
.with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
cassandraClient.getQueueMessageSession().execute( update );
}
- Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+ void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) {
- Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+ Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) )
+ .with( QueryBuilder.decr( COLUMN_COUNTER_VALUE, decrement ) );
+ cassandraClient.getQueueMessageSession().execute( update );
+ }
+
+
+ Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) {
+
+ Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS )
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
- .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString()) );
ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query );
List<Row> all = resultSet.all();
if ( all.size() > 1 ) {
throw new QakkaRuntimeException(
- "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+ "Multiple rows for counter " + queueName + " type " + type );
}
if ( all.isEmpty() ) {
return null;
@@ -194,6 +235,32 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
}
+ private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {
+
+ String key = buildKey( queueName, type );
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+ synchronized ( inMemoryCount ) {
+
+ if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+
+ long totalIncrement = inMemoryCount.getIncrement().get();
+ incrementCounterInStorage( queueName, type, totalIncrement );
+
+ long totalDecrement = inMemoryCount.getDecrement().get();
+ decrementCounterInStorage( queueName, type, totalDecrement );
+
+ inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+ inMemoryCount.getIncrement().set( 0L );
+ inMemoryCount.getDecrement().set( 0L );
+
+ numChanges.set( 0 );
+ }
+ }
+ }
+
+
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
return Collections.EMPTY_LIST;
@@ -201,8 +268,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Override
public Collection<TableDefinition> getTables() {
- return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+ return Collections.singletonList( new TableDefinitionStringImpl(
+ cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGE_COUNTERS, CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 0eb609d..f3cae86 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.*;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.queue.LegacyQueueFig;
import org.apache.usergrid.persistence.queue.LegacyQueueManager;
import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -139,10 +140,9 @@ public class QakkaQueueManager implements LegacyQueueManager {
return messages;
}
-
@Override
public long getQueueDepth() {
- return 0;
+ return queueMessageManager.getQueueDepth( scope.getName() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 0413f81..3225a66 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -177,6 +177,8 @@ public class QueueMessageManagerTest extends AbstractTest {
}
}
+ Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+
// get all messages from queue
List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
new file mode 100644
index 0000000..a4ea0f1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/20/16.
+ */
+public class MessageCounterSerializationTest extends AbstractTest {
+
+ @Test
+ public void testBasicOperation() {
+
+ Injector injector = getInjector();
+ MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+
+ String queueName = "mcst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+ try {
+ mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+ fail("Should have throw NotFoundException");
+ } catch ( NotFoundException expected ) {
+ // pass
+ }
+
+ for ( int i=0; i<10; i++ ) {
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ Assert.assertEquals( i+1, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 10, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 20, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 30, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 40, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 50, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 50 );
+ Assert.assertEquals( 100, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 90, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 80, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index f9c2951..8dc16bb 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -36,9 +36,6 @@ public class ShardCounterSerializationTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index aacc187..fb46f3d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -44,6 +44,7 @@ queue.shard.allocation.check.frequency.millis=100
queue.shard.allocation.advance.time.millis=200
queue.max.inmemory.shard.counter = 100
+queue.max.inmemory.max.message.changes=3
queue.long.polling.time.millis=2000
[09/10] usergrid git commit: Change to use proper Guice injection
instead of static injector kludge.
Posted by sn...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 19c1211..ba3c0f8 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -22,8 +22,11 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -52,10 +55,7 @@ public class QueueReaderTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
- getInjector().getInstance( App.class ); // init the INJECTOR
+ Injector injector = getInjector();
QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
@@ -94,7 +94,8 @@ public class QueueReaderTest extends AbstractTest {
// run the QueueRefresher to fill up the in-memory queue
ActorSystem system = ActorSystem.create("Test-" + queueName);
- ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
+ ActorRef queueReaderRef = system.actorOf(
+ Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), "queueReader");
QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
// need to wait for refresh to complete
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index e3541a4..3079773 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -22,8 +22,10 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -58,7 +60,7 @@ public class QueueTimeouterTest extends AbstractTest {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- getInjector().getInstance( App.class ); // init the INJECTOR
+ Injector injector = getInjector();
QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
@@ -110,7 +112,8 @@ public class QueueTimeouterTest extends AbstractTest {
// run timeouter actor
ActorSystem system = ActorSystem.create("Test-" + queueName);
- ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter");
+ ActorRef timeouterRef = system.actorOf( Props.create(
+ GuiceActorProducer.class, injector, QueueTimeouter.class), "timeouter");
QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 7fd664f..b602177 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -28,6 +28,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.QakkaModule;
@@ -111,7 +112,8 @@ public class ShardAllocatorTest extends AbstractTest {
// Run shard allocator actor by sending message to it
ActorSystem system = ActorSystem.create("Test-" + queueName);
- ActorRef shardAllocRef = system.actorOf( Props.create( ShardAllocator.class, queueName ), "shardallocator");
+ ActorRef shardAllocRef = system.actorOf( Props.create(
+ GuiceActorProducer.class, injector, ShardAllocator.class), "shardallocator");
ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately
@@ -187,26 +189,32 @@ public class ShardAllocatorTest extends AbstractTest {
queueManager.createQueue( new Queue( queueName ));
- // Create 4000 messages
+ try {
- int numMessages = 4000;
+ // Create 4000 messages
- for ( int i=0; i<numMessages; i++ ) {
- queueMessageManager.sendMessages(
+ int numMessages = 4000;
+
+ for (int i = 0; i < numMessages; i++) {
+ queueMessageManager.sendMessages(
queueName,
Collections.singletonList( region ),
null, // delay
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
- }
+ }
- distributedQueueService.refresh();
- Thread.sleep(3000);
+ distributedQueueService.refresh();
+ Thread.sleep( 3000 );
- // Test that 8 shards were created
+ // Test that 8 shards were created
- Assert.assertTrue("num shards >= 7",
- countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 );
+ Assert.assertTrue( "num shards >= 7",
+ countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 );
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index b207ea3..e1cbda4 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -28,6 +28,6 @@ log4j.logger.org.glassfish=WARN
#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
-log4j.logger.org.apache.usergrid.corepersistence.asyncevents=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=INFO
+log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO
[05/10] usergrid git commit: When writing to queue,
if in-memory queue is empty then refresh will be done.
Posted by sn...@apache.org.
When writing to queue, if in-memory queue is empty then refresh will be done.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/727ff1d6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/727ff1d6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/727ff1d6
Branch: refs/heads/usergrid-1318-queue
Commit: 727ff1d642fdaf3768ab2b1f0c07364854ec2e60
Parents: 9306f12
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 13:48:47 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 13:48:47 2016 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/persistence/qakka/QakkaFig.java | 2 +-
.../persistence/qakka/core/impl/InMemoryQueue.java | 4 ++++
.../persistence/qakka/distributed/actors/QueueActor.java | 11 +++++++----
.../qakka/distributed/actors/QueueWriter.java | 3 +++
.../distributed/impl/DistributedQueueServiceImpl.java | 8 ++++++--
.../qakka/distributed/messages/QueueRefreshRequest.java | 8 +++++++-
.../persistence/qakka/core/QueueMessageManagerTest.java | 1 +
.../qakka/distributed/actors/QueueReaderTest.java | 2 +-
8 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c3f4189..da47c98 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -94,7 +94,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
/** How often to refresh each queue's in-memory data */
@Key(QUEUE_REFRESH_MILLISECONDS)
- @Default("500")
+ @Default("1000")
int getQueueRefreshMilliseconds();
/** How many queue messages to keep in-memory */
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 474ef5c..27de079 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -78,6 +78,10 @@ public class InMemoryQueue {
return getQueue( queueName ).poll();
}
+ public DatabaseQueueMessage peek( String queueName ) {
+ return getQueue( queueName ).peek();
+ }
+
public int size( String queueName ) {
return getQueue( queueName ).size();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 3b50711..315975c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -86,7 +86,7 @@ public class QueueActor extends UntypedActor {
Duration.create( 0, TimeUnit.MILLISECONDS),
Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
self(),
- new QueueRefreshRequest( request.getQueueName() ),
+ new QueueRefreshRequest( request.getQueueName(), false ),
getContext().dispatcher(),
getSelf());
refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
@@ -121,9 +121,12 @@ public class QueueActor extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest)message;
if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
- queueReadersByQueueName.put( request.getQueueName(), readerRef );
+
+ if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+ ActorRef readerRef = getContext().actorOf( Props.create(
+ QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+ queueReadersByQueueName.put( request.getQueueName(), readerRef );
+ }
}
// hand-off to queue's reader
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 7166ef1..e54d916 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -25,6 +25,7 @@ import com.google.inject.Injector;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -44,6 +45,7 @@ public class QueueWriter extends UntypedActor {
public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
+ private final DistributedQueueService distributedQueueService;
private final QueueMessageSerialization messageSerialization;
private final TransferLogSerialization transferLogSerialization;
private final AuditLogSerialization auditLogSerialization;
@@ -61,6 +63,7 @@ public class QueueWriter extends UntypedActor {
auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
metricsService = injector.getInstance( MetricsService.class );
+ distributedQueueService = injector.getInstance( DistributedQueueService.class );
messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 3d6a808..9d71c31 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,7 +25,6 @@ import akka.util.Timeout;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.log4j.net.SyslogAppender;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.ClientActor;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -114,7 +113,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public void refreshQueue(String queueName) {
logger.info("{} Requesting refresh for queue: {}", this, queueName);
- QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+ QueueRefreshRequest request = new QueueRefreshRequest( queueName, false );
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );
}
@@ -168,6 +167,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
if ( retries > 1 ) {
logger.debug("SUCCESS after {} retries", retries );
}
+
+ logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+ QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+ clientActor.tell( qrr, null );
+
return qarm.getSendStatus();
} else {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
index a81a6fd..b65ad3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -24,10 +24,16 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
public class QueueRefreshRequest implements QakkaMessage {
private final String queueName;
+ private final boolean onlyIfEmpty;
- public QueueRefreshRequest(String queueName ) {
+ public QueueRefreshRequest( String queueName, boolean onlyIfEmpty ) {
this.queueName = queueName;
+ this.onlyIfEmpty = onlyIfEmpty;
+ }
+
+ public boolean isOnlyIfEmpty() {
+ return onlyIfEmpty;
}
public String getQueueName() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 3225a66..c10d1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -175,6 +175,7 @@ public class QueueMessageManagerTest extends AbstractTest {
if (inMemoryQueue.size( queueName ) == 40) {
break;
}
+ Thread.sleep( 500 );
}
Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 0b8b795..19c1211 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -95,7 +95,7 @@ public class QueueReaderTest extends AbstractTest {
ActorSystem system = ActorSystem.create("Test-" + queueName);
ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
- QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName );
+ QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
// need to wait for refresh to complete
int maxRetries = 10;
[10/10] usergrid git commit: Change to use proper Guice injection
instead of static injector kludge.
Posted by sn...@apache.org.
Change to use proper Guice injection instead of static injector kludge.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a19ba9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a19ba9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a19ba9a
Branch: refs/heads/usergrid-1318-queue
Commit: 5a19ba9a748c5d96f5356d77df1e7845aa92fc0f
Parents: 2cd8ecb
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 30 17:18:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 30 17:18:24 2016 -0400
----------------------------------------------------------------------
build.log | 15 ++
.../apache/usergrid/persistence/qakka/App.java | 17 +-
.../qakka/core/impl/InMemoryQueue.java | 11 +-
.../core/impl/QueueMessageManagerImpl.java | 18 +-
.../distributed/DistributedQueueService.java | 2 +-
.../qakka/distributed/actors/QueueActor.java | 56 +++---
.../distributed/actors/QueueActorHelper.java | 105 ++++++++++-
.../distributed/actors/QueueActorRouter.java | 11 +-
.../distributed/actors/QueueRefresher.java | 120 +-----------
.../qakka/distributed/actors/QueueSender.java | 5 +-
.../distributed/actors/QueueSenderRouter.java | 10 +-
.../distributed/actors/QueueTimeouter.java | 26 +--
.../qakka/distributed/actors/QueueWriter.java | 6 +-
.../distributed/actors/QueueWriterRouter.java | 11 +-
.../distributed/actors/ShardAllocator.java | 22 +--
.../impl/DistributedQueueServiceImpl.java | 26 ++-
.../qakka/core/QueueMessageManagerTest.java | 178 +++++++++---------
.../distributed/QueueActorServiceTest.java | 112 ++++++-----
.../actors/QueueActorHelperTest.java | 186 +++++++++++--------
.../distributed/actors/QueueReaderTest.java | 11 +-
.../distributed/actors/QueueTimeouterTest.java | 7 +-
.../distributed/actors/ShardAllocatorTest.java | 30 +--
.../queue/src/test/resources/log4j.properties | 6 +-
23 files changed, 531 insertions(+), 460 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/build.log
----------------------------------------------------------------------
diff --git a/build.log b/build.log
new file mode 100644
index 0000000..43ffacd
--- /dev/null
+++ b/build.log
@@ -0,0 +1,15 @@
+[INFO] Scanning for projects...
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD FAILURE
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time: 0.086 s
+[INFO] Finished at: 2016-09-30T07:54:28-04:00
+[INFO] Final Memory: 46M/6710M
+[INFO] ------------------------------------------------------------------------
+[ERROR] The goal you specified requires a project to execute but there is no POM in this directory (/Users/ApigeeCorporation/src/usergrid-snoopdave). Please verify you invoked Maven from the correct directory. -> [Help 1]
+[ERROR]
+[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
+[ERROR] Re-run Maven using the -X switch to enable full debug logging.
+[ERROR]
+[ERROR] For more information about the errors and possible solutions, please read the following articles:
+[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MissingProjectException
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index abbf3da..35fdb20 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
public class App implements MetricsService {
private static final Logger logger = LoggerFactory.getLogger( App.class );
- // TODO: can we avoid this kludge with better Akka-Guice integration?
- static public Injector INJECTOR;
-
private final ActorSystemFig actorSystemFig;
private final ActorSystemManager actorSystemManager;
private final DistributedQueueService distributedQueueService;
@@ -55,14 +52,16 @@ public class App implements MetricsService {
@Inject
public App(
- Injector injector,
QakkaFig qakkaFig,
ActorSystemFig actorSystemFig,
ActorSystemManager actorSystemManager,
DistributedQueueService distributedQueueService,
- MigrationManager migrationManager) {
+ MigrationManager migrationManager,
+ QueueActorRouterProducer queueActorRouterProducer,
+ QueueWriterRouterProducer queueWriterRouterProducer,
+ QueueSenderRouterProducer queueSenderRouterProducer
+ ) {
- this.INJECTOR = injector;
this.actorSystemFig = actorSystemFig;
this.actorSystemManager = actorSystemManager;
this.distributedQueueService = distributedQueueService;
@@ -74,9 +73,9 @@ public class App implements MetricsService {
} catch (MigrationException e) {
throw new QakkaRuntimeException( "Error running migration", e );
}
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+ actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+ actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+ actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 27de079..1f6fe6e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -19,8 +19,10 @@
package org.apache.usergrid.persistence.qakka.core.impl;
+import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -71,7 +73,14 @@ public class InMemoryQueue {
}
public UUID getNewest( String queueName ) {
- return newestByQueueName.get( queueName );
+ UUID newest = newestByQueueName.get( queueName );
+// if ( newest == null ) {
+// // Create oldest UUID from a UNIX timestamp via DataStax utility
+// // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+// newest = UUIDs.startOf( 0L );
+// newestByQueueName.put( queueName, newest );
+// }
+ return newest;
}
public DatabaseQueueMessage poll( String queueName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 691c1a6..59e0ce0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -138,10 +138,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public List<QueueMessage> getNextMessages(String queueName, int count) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
@@ -210,15 +206,14 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public void ackMessage(String queueName, UUID queueMessageId) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
- if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+ if ( DistributedQueueService.Status.NOT_INFLIGHT.equals( status )) {
throw new BadRequestException( "Message not inflight" );
+ } else if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+ throw new BadRequestException( "Bad request" );
+
} else if ( DistributedQueueService.Status.ERROR.equals( status )) {
throw new QakkaRuntimeException( "Unable to ack message due to error" );
}
@@ -228,10 +223,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
// TODO: implement requeueMessage
throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
@@ -268,7 +259,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
// first look in INFLIGHT storage
-
DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
queueName, actorSystemFig.getRegionLocal(), null,
DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
index b02a623..b11dcff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -30,7 +30,7 @@ import java.util.UUID;
*/
public interface DistributedQueueService {
- enum Status { SUCCESS, ERROR, BAD_REQUEST };
+ enum Status { SUCCESS, ERROR, BAD_REQUEST, NOT_INFLIGHT };
void init();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 87342ad..5ebba3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -24,7 +24,9 @@ import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -65,10 +67,13 @@ public class QueueActor extends UntypedActor {
private final AtomicLong messageCount = new AtomicLong(0);
private final Set<String> queuesSeen = new HashSet<>();
+ private final Injector injector;
- public QueueActor() {
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueActor( Injector injector ) {
+
+ this.injector = injector;
qakkaFig = injector.getInstance( QakkaFig.class );
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
@@ -107,7 +112,7 @@ public class QueueActor extends UntypedActor {
getContext().dispatcher(),
getSelf());
timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
- logger.debug("Created scheduler for queue {}", request.getQueueName() );
+ logger.debug("Created timeouter for queue {}", request.getQueueName() );
}
if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
@@ -126,18 +131,20 @@ public class QueueActor extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest)message;
queuesSeen.add( request.getQueueName() );
-
- if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-
- if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
- queueReadersByQueueName.put( request.getQueueName(), readerRef );
- }
- }
-
- // hand-off to queue's reader
- queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+ queueActorHelper.queueRefresh( request.getQueueName() );
+
+// if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+//
+// if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+// ActorRef readerRef = getContext().actorOf(
+// Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+// request.getQueueName() + "_reader");
+// queueReadersByQueueName.put( request.getQueueName(), readerRef );
+// }
+// }
+//
+// // hand-off to queue's reader
+// queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -145,8 +152,9 @@ public class QueueActor extends UntypedActor {
queuesSeen.add( request.getQueueName() );
if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+ ActorRef readerRef = getContext().actorOf(
+ Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+ request.getQueueName() + "_timeouter");
queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
}
@@ -160,8 +168,9 @@ public class QueueActor extends UntypedActor {
queuesSeen.add( request.getQueueName() );
if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+ ActorRef readerRef = getContext().actorOf(
+ Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+ request.getQueueName() + "_shard_allocator");
shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
}
@@ -181,15 +190,15 @@ public class QueueActor extends UntypedActor {
while (queueMessages.size() < queueGetRequest.getNumRequested()) {
- DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+ DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
if (queueMessage != null) {
if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
queueMessages.add( queueMessage );
}
} else {
-// logger.debug("in-memory queue for {} is empty, object is: {}",
-// queueGetRequest.getQueueName(), inMemoryQueue );
+ logger.debug("in-memory queue for {} is empty, object is: {}",
+ queueGetRequest.getQueueName(), inMemoryQueue );
break;
}
}
@@ -199,6 +208,9 @@ public class QueueActor extends UntypedActor {
DatabaseQueueMessage.Type.DEFAULT,
queueMessages.size());
+ logger.debug("{} returning {} for queue {}",
+ this, queueMessages.size(), queueGetRequest.getQueueName());
+
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 26db903..68250df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -19,17 +19,28 @@
package org.apache.usergrid.persistence.qakka.distributed.actors;
+import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.DecimalFormat;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
public class QueueActorHelper {
@@ -38,18 +49,33 @@ public class QueueActorHelper {
private final ActorSystemFig actorSystemFig;
private final QueueMessageSerialization messageSerialization;
private final AuditLogSerialization auditLogSerialization;
+ private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final MetricsService metricsService;
+ private final CassandraClient cassandraClient;
+
+ private final AtomicLong runCount = new AtomicLong(0);
+ private final AtomicLong totalRead = new AtomicLong(0);
@Inject
public QueueActorHelper(
- ActorSystemFig actorSystemFig,
+ QakkaFig qakkaFig,
+ ActorSystemFig actorSystemFig,
QueueMessageSerialization messageSerialization,
- AuditLogSerialization auditLogSerialization
+ AuditLogSerialization auditLogSerialization,
+ InMemoryQueue inMemoryQueue,
+ MetricsService metricsService,
+ CassandraClient cassandraClient
) {
- this.actorSystemFig = actorSystemFig;
- this.messageSerialization = messageSerialization;
+ this.actorSystemFig = actorSystemFig;
+ this.messageSerialization = messageSerialization;
this.auditLogSerialization = auditLogSerialization;
+ this.inMemoryQueue = inMemoryQueue;
+ this.qakkaFig = qakkaFig;
+ this.metricsService = metricsService;
+ this.cassandraClient = cassandraClient;
}
@@ -78,7 +104,7 @@ public class QueueActorHelper {
queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
if ( queueMessage == null ) {
- return DistributedQueueService.Status.BAD_REQUEST;
+ return DistributedQueueService.Status.NOT_INFLIGHT;
}
boolean error = false;
@@ -164,4 +190,73 @@ public class QueueActorHelper {
return true;
}
+
+ void queueRefresh( String queueName ) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+ try {
+
+ if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, Optional.empty() );
+
+ UUID since = inMemoryQueue.getNewest( queueName );
+
+ String region = actorSystemFig.getRegionLocal();
+ MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+ shardIterator, since);
+
+ int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+ int count = 0;
+
+ while ( multiShardIterator.hasNext() && count < need ) {
+ DatabaseQueueMessage queueMessage = multiShardIterator.next();
+ inMemoryQueue.add( queueName, queueMessage );
+ count++;
+ }
+
+ long runs = runCount.incrementAndGet();
+ long readCount = totalRead.addAndGet( count );
+
+ if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+ Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+
+ logger.debug("QueueRefresher for queue '{}' stats:\n" +
+ " Num runs={}\n" +
+ " Read count={}\n" +
+ " Mean={}\n" +
+ " One min rate={}\n" +
+ " Five min rate={}\n" +
+ " Snapshot mean={}\n" +
+ " Snapshot min={}\n" +
+ " Snapshot max={}",
+ queueName,
+ t.getCount(),
+ readCount,
+ format.format( t.getMeanRate() ),
+ format.format( t.getOneMinuteRate() ),
+ format.format( t.getFiveMinuteRate() ),
+ format.format( t.getSnapshot().getMean() / nano ),
+ format.format( (double) t.getSnapshot().getMin() / nano ),
+ format.format( (double) t.getSnapshot().getMax() / nano ) );
+ }
+
+ if ( count > 0 ) {
+ logger.debug( "Added {} in-memory for queue {}, new size = {}",
+ count, queueName, inMemoryQueue.size( queueName ) );
+ }
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 97e591c..9257a0d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -24,6 +24,9 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.ConsistentHashingRouter;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
@@ -35,9 +38,11 @@ public class QueueActorRouter extends UntypedActor {
private final ActorRef routerRef;
- public QueueActorRouter() {
- routerRef = getContext().actorOf(
- FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+ @Inject
+ public QueueActorRouter( Injector injector ) {
+
+ this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index dbd5235..2f70088 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,55 +20,20 @@
package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import com.google.inject.Inject;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.DecimalFormat;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
public class QueueRefresher extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
- private final String queueName;
- private final InMemoryQueue inMemoryQueue;
- private final QakkaFig qakkaFig;
- private final ActorSystemFig actorSystemFig;
- private final MetricsService metricsService;
- private final CassandraClient cassandraClient;
-
- private final AtomicLong runCount = new AtomicLong(0);
- private final AtomicLong totalRead = new AtomicLong(0);
-
+ final QueueActorHelper helper;
- public QueueRefresher(String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
-
- inMemoryQueue = injector.getInstance( InMemoryQueue.class );
- qakkaFig = injector.getInstance( QakkaFig.class );
- actorSystemFig = injector.getInstance( ActorSystemFig.class );
- metricsService = injector.getInstance( MetricsService.class );
- cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ @Inject
+ public QueueRefresher( QueueActorHelper helper ) {
+ this.helper = helper;
}
@@ -78,78 +43,9 @@ public class QueueRefresher extends UntypedActor {
if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest) message;
-
- //logger.debug( "running for queue {}", queueName );
-
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
- try {
-
- if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
- ShardIterator shardIterator = new ShardIterator(
- cassandraClient, queueName, actorSystemFig.getRegionLocal(),
- Shard.Type.DEFAULT, Optional.empty() );
-
- UUID since = inMemoryQueue.getNewest( queueName );
-
- String region = actorSystemFig.getRegionLocal();
- MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
- cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
- shardIterator, since);
-
- int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
- int count = 0;
-
- while ( multiShardIterator.hasNext() && count < need ) {
- DatabaseQueueMessage queueMessage = multiShardIterator.next();
- inMemoryQueue.add( queueName, queueMessage );
- count++;
- }
-
- long runs = runCount.incrementAndGet();
- long readCount = totalRead.addAndGet( count );
-
- if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
- final DecimalFormat format = new DecimalFormat("##.###");
- final long nano = 1000000000;
- Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
-
- logger.debug("QueueRefresher for queue '{}' stats:\n" +
- " Num runs={}\n" +
- " Read count={}\n" +
- " Mean={}\n" +
- " One min rate={}\n" +
- " Five min rate={}\n" +
- " Snapshot mean={}\n" +
- " Snapshot min={}\n" +
- " Snapshot max={}",
- queueName,
- t.getCount(),
- readCount,
- format.format( t.getMeanRate() ),
- format.format( t.getOneMinuteRate() ),
- format.format( t.getFiveMinuteRate() ),
- format.format( t.getSnapshot().getMean() / nano ),
- format.format( (double) t.getSnapshot().getMin() / nano ),
- format.format( (double) t.getSnapshot().getMax() / nano ) );
- }
-
-// if ( count > 0 ) {
-// logger.debug( "Added {} in-memory for queue {}, new size = {}",
-// count, queueName, inMemoryQueue.size( queueName ) );
-// }
- }
-
- } finally {
- timer.close();
- }
+ logger.debug( "running for queue {}", request.getQueueName() );
+ String queueName = request.getQueueName();
+ helper.queueRefresh( queueName );
} else {
unhandled( message );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 03d1216..739e1c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -25,6 +25,7 @@ import akka.cluster.client.ClusterClient;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
@@ -63,9 +64,9 @@ public class QueueSender extends UntypedActor {
private final QakkaFig qakkaFig;
private final MetricsService metricsService;
- public QueueSender() {
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueSender( Injector injector ) {
actorSystemManager = injector.getInstance( ActorSystemManager.class );
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 20603a5..92d0785 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
@@ -34,10 +37,11 @@ public class QueueSenderRouter extends UntypedActor {
private final ActorRef router;
- public QueueSenderRouter() {
+ @Inject
+ public QueueSenderRouter( Injector injector ) {
- router = getContext().actorOf(
- FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+ this.router = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b47aac6..9b11277 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class QueueTimeouter extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
- private final String queueName;
-
private final QueueMessageSerialization messageSerialization;
private final MetricsService metricsService;
private final ActorSystemFig actorSystemFig;
@@ -62,10 +61,8 @@ public class QueueTimeouter extends UntypedActor {
private final AtomicLong totalTimedout = new AtomicLong(0);
- public QueueTimeouter(String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueTimeouter( Injector injector) {
messageSerialization = injector.getInstance( QueueMessageSerialization.class );
actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -88,10 +85,7 @@ public class QueueTimeouter extends UntypedActor {
QueueTimeoutRequest request = (QueueTimeoutRequest) message;
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
+ String queueName = request.getQueueName();
//logger.debug("Processing timeouts for queue {} ", queueName );
@@ -171,12 +165,12 @@ public class QueueTimeouter extends UntypedActor {
format.format( (double) t.getSnapshot().getMax() / nano ) );
}
-// if (count > 0) {
-// logger.debug( "Timed out {} messages for queue {}", count, queueName );
-//
-// messageCounterSerialization.decrementCounter(
-// queueName, DatabaseQueueMessage.Type.DEFAULT, count);
-// }
+ if (count > 0) {
+ logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+ messageCounterSerialization.decrementCounter(
+ queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+ }
} finally {
timer.close();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e54d916..273f0b2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -54,9 +55,8 @@ public class QueueWriter extends UntypedActor {
private final MessageCounterSerialization messageCounterSerialization;
- public QueueWriter() {
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueWriter( Injector injector ) {
messageSerialization = injector.getInstance( QueueMessageSerialization.class );
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 9cf06d9..f0540af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
@@ -33,11 +36,11 @@ public class QueueWriterRouter extends UntypedActor {
private final ActorRef router;
+ @Inject
+ public QueueWriterRouter( Injector injector ) {
- public QueueWriterRouter() {
-
- router = getContext().actorOf(
- FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+ this.router = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 46e4906..65c3370 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.UUID;
public class ShardAllocator extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
- private final String queueName;
-
private final QakkaFig qakkaFig;
private final ActorSystemFig actorSystemFig;
private final ShardSerialization shardSerialization;
@@ -59,10 +58,8 @@ public class ShardAllocator extends UntypedActor {
private final CassandraClient cassandraClient;
- public ShardAllocator( String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public ShardAllocator( Injector injector ) {
this.qakkaFig = injector.getInstance( QakkaFig.class );
this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
@@ -70,8 +67,6 @@ public class ShardAllocator extends UntypedActor {
this.actorSystemFig = injector.getInstance( ActorSystemFig.class );
this.metricsService = injector.getInstance( MetricsService.class );
this.cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
- logger.debug( "Created shard allocator for queue {}", queueName );
}
@@ -82,14 +77,9 @@ public class ShardAllocator extends UntypedActor {
ShardCheckRequest request = (ShardCheckRequest) message;
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
-
// check both types of shard
- checkLatestShard( Shard.Type.DEFAULT );
- checkLatestShard( Shard.Type.INFLIGHT );
+ checkLatestShard( request.getQueueName(), Shard.Type.DEFAULT );
+ checkLatestShard( request.getQueueName(), Shard.Type.INFLIGHT );
} else {
unhandled( message );
@@ -97,7 +87,7 @@ public class ShardAllocator extends UntypedActor {
}
- private void checkLatestShard( Shard.Type type ) {
+ private void checkLatestShard( String queueName, Shard.Type type ) {
Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index bcb6b79..e24bdb4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
@@ -134,9 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
String queueName, String sourceRegion, String destRegion, UUID messageId,
Long deliveryTime, Long expirationTime ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
int maxRetries = qakkaFig.getMaxSendRetries();
@@ -213,9 +213,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
@@ -280,9 +279,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status ackMessage(String queueName, UUID queueMessageId ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- return Status.BAD_REQUEST;
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
@@ -293,9 +291,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status requeueMessage(String queueName, UUID messageId) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
QueueAckRequest message = new QueueAckRequest( queueName, messageId );
@@ -306,9 +303,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status clearMessages(String queueName) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
// TODO: implement clear queue
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index c10d1f5..8ce9822 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -70,9 +70,9 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- Injector injector = getInjector();
+ String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ Injector injector = getInjector();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -82,54 +82,60 @@ public class QueueMessageManagerTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
// create queue and send one message to it
- String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
QueueManager queueManager = injector.getInstance( QueueManager.class );
- QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
- queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- String jsonData = "{}";
- qmm.sendMessages( queueName, Collections.singletonList(region), null, null,
- "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) );
- distributedQueueService.refresh();
- Thread.sleep(1000);
+ try {
- // get message from the queue
- List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
- Assert.assertEquals( 1, messages.size() );
- QueueMessage message = messages.get(0);
+ QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+ String jsonData = "{}";
+ qmm.sendMessages( queueName, Collections.singletonList( region ), null, null,
+ "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) );
- // test that queue message data is present and correct
- QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
- DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
- Assert.assertNotNull( data );
- Assert.assertEquals( "application/json", data.getContentType() );
- String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") );
- Assert.assertEquals( jsonData, jsonDataReturned );
-
- // test that transfer log is empty for our queue
- TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
- Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
- List<TransferLog> logs = all.getEntities().stream()
+ distributedQueueService.refresh();
+ Thread.sleep( 1000 );
+
+ // get message from the queue
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, messages.size() );
+ QueueMessage message = messages.get( 0 );
+
+ // test that queue message data is present and correct
+ QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+ DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
+ Assert.assertNotNull( data );
+ Assert.assertEquals( "application/json", data.getContentType() );
+ String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) );
+ Assert.assertEquals( jsonData, jsonDataReturned );
+
+ // test that transfer log is empty for our queue
+ TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
+ Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+ List<TransferLog> logs = all.getEntities().stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
- Assert.assertTrue( logs.isEmpty() );
+ Assert.assertTrue( logs.isEmpty() );
- // ack the message
- qmm.ackMessage( queueName, message.getQueueMessageId() );
+ // ack the message
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
- // test that message is no longer stored in non-replicated keyspace
+ // test that message is no longer stored in non-replicated keyspace
- Assert.assertNull( qms.loadMessage( queueName, region, null,
- DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ));
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) );
- Assert.assertNull( qms.loadMessage( queueName, region, null,
- DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ));
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) );
- // test that audit log entry was written
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 3, auditLogs.getEntities().size() );
+ // test that audit log entry was written
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 3, auditLogs.getEntities().size() );
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -138,8 +144,6 @@ public class QueueMessageManagerTest extends AbstractTest {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -152,74 +156,82 @@ public class QueueMessageManagerTest extends AbstractTest {
// create some number of queue messages
QueueManager queueManager = injector.getInstance( QueueManager.class );
- QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
- String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric(15);
- queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- int numMessages = 40;
+ String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 );
- for ( int i=0; i<numMessages; i++ ) {
- qmm.sendMessages(
+ try {
+
+ QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+
+ int numMessages = 40;
+
+ for (int i = 0; i < numMessages; i++) {
+ qmm.sendMessages(
queueName,
Collections.singletonList( region ),
null, // delay
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
- }
+ }
- int maxRetries = 15;
- int retries = 0;
- while ( retries++ < maxRetries ) {
- distributedQueueService.refresh();
- if (inMemoryQueue.size( queueName ) == 40) {
- break;
+ int maxRetries = 15;
+ int retries = 0;
+ while (retries++ < maxRetries) {
+ distributedQueueService.refresh();
+ if (inMemoryQueue.size( queueName ) == 40) {
+ break;
+ }
+ Thread.sleep( 500 );
}
- Thread.sleep( 500 );
- }
- Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+ Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
- // get all messages from queue
+ // get all messages from queue
- List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
- Assert.assertEquals( numMessages, messages.size() );
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
+ Assert.assertEquals( numMessages, messages.size() );
- // ack half of the messages
+ // ack half of the messages
- List<QueueMessage> remove = new ArrayList<>();
+ List<QueueMessage> remove = new ArrayList<>();
- for ( int i=0; i<numMessages/2; i++ ) {
- QueueMessage queueMessage = messages.get( i );
- qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
- remove.add( queueMessage );
- }
+ for (int i = 0; i < numMessages / 2; i++) {
+ QueueMessage queueMessage = messages.get( i );
+ qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+ remove.add( queueMessage );
+ }
- for ( QueueMessage message : remove ) {
- messages.remove( message );
- }
+ for (QueueMessage message : remove) {
+ messages.remove( message );
+ }
- // wait for twice timeout period
+ // wait for twice timeout period
- Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+ Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
- distributedQueueService.processTimeouts();
+ distributedQueueService.processTimeouts();
- Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+ Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 );
- // attempt to ack other half of messages
+ // attempt to ack other half of messages
- for ( QueueMessage message : messages ) {
- try {
- qmm.ackMessage( queueName, message.getQueueMessageId() );
- Assert.fail("Message should have timed out by now");
+ for (QueueMessage message : messages) {
+ try {
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
+ Assert.fail( "Message should have timed out by now" );
- } catch ( QakkaRuntimeException expected ) {
- // keep on going...
+ } catch (QakkaRuntimeException expected) {
+ // keep on going...
+ }
}
- }
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7423424..53f9224 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -78,35 +78,42 @@ public class QueueActorServiceTest extends AbstractTest {
QueueManager queueManager = injector.getInstance( QueueManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- // send 1 queue message, get back one queue message
- UUID messageId = UUIDGen.getTimeUUID();
+ try {
- final String data = "my test data";
- final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ // send 1 queue message, get back one queue message
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
- serialization.writeMessageData( messageId, messageBody );
+ serialization.writeMessageData( messageId, messageBody );
- distributedQueueService.sendMessageToRegion(
- queueName, region, region, messageId, null, null);
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId, null, null );
- distributedQueueService.refresh();
- Thread.sleep(1000);
+ distributedQueueService.refresh();
+ Thread.sleep( 1000 );
- Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
- Assert.assertEquals( 1, qmReturned.size() );
+ Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, qmReturned.size() );
- DatabaseQueueMessage dqm = qmReturned.iterator().next();
- DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
- ByteBuffer blob = dqmb.getBlob();
+ DatabaseQueueMessage dqm = qmReturned.iterator().next();
+ DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
+ ByteBuffer blob = dqmb.getBlob();
- String returnedData = new String( blob.array(), "UTF-8");
+ String returnedData = new String( blob.array(), "UTF-8" );
// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
// ObjectInputStream ios = new ObjectInputStream( bais );
// String returnedData = (String)ios.readObject();
- Assert.assertEquals( data, returnedData );
+ Assert.assertEquals( data, returnedData );
+
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
- distributedQueueService.shutdown();
}
@@ -128,51 +135,58 @@ public class QueueActorServiceTest extends AbstractTest {
String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
QueueManager queueManager = injector.getInstance( QueueManager.class );
- queueManager.createQueue(
- new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- for ( int i=0; i<100; i++ ) {
+ try {
- UUID messageId = UUIDGen.getTimeUUID();
+ queueManager.createQueue(
+ new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
- final String data = "my test data";
- final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ for (int i = 0; i < 100; i++) {
+
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
- serialization.writeMessageData( messageId, messageBody );
+ serialization.writeMessageData( messageId, messageBody );
- xferLogSerialization.recordTransferLog(
- queueName, actorSystemFig.getRegionLocal(), region, messageId );
+ xferLogSerialization.recordTransferLog(
+ queueName, actorSystemFig.getRegionLocal(), region, messageId );
- distributedQueueService.sendMessageToRegion(
- queueName, region, region, messageId , null, null);
- }
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId, null, null );
+ }
- int maxRetries = 25;
- int retries = 0;
- int count = 0;
- while ( retries++ < maxRetries ) {
- distributedQueueService.refresh();
- if (inMemoryQueue.size( queueName ) == 100) {
- count = 100;
- break;
+ int maxRetries = 25;
+ int retries = 0;
+ int count = 0;
+ while (retries++ < maxRetries) {
+ distributedQueueService.refresh();
+ if (inMemoryQueue.size( queueName ) == 100) {
+ count = 100;
+ break;
+ }
+ Thread.sleep( 1000 );
}
- Thread.sleep(1000);
- }
- Assert.assertEquals( 100, count );
+ Assert.assertEquals( 100, count );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 3bf352f..791650e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -53,7 +53,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void loadDatabaseQueueMessage() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
injector.getInstance( App.class ); // init the INJECTOR
@@ -66,33 +65,39 @@ public class QueueActorHelperTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
- queueManager.createQueue( new Queue( queueName ) );
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ try {
+ queueManager.createQueue( new Queue( queueName ) );
- // write message
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
- DatabaseQueueMessage message = new DatabaseQueueMessage(
- QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT,
- queueName,
- actorSystemFig.getRegionLocal(),
- null,
- System.currentTimeMillis(),
- null,
- queueMessageId);
- qms.writeMessage( message );
+ // write message
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
- // load message
+ // load message
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
- queueName, message.getQueueMessageId(), message.getType() );
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ queueName, message.getQueueMessageId(), message.getType() );
- Assert.assertNotNull( queueMessage );
+ Assert.assertNotNull( queueMessage );
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -100,8 +105,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void loadDatabaseQueueMessageNotFound() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -112,20 +115,27 @@ public class QueueActorHelperTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
queueManager.createQueue( new Queue( queueName ) );
- // don't write any message
+ try {
+
+ // don't write any message
- // load message
+ // load message
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT );
- Assert.assertNull( queueMessage );
+ Assert.assertNull( queueMessage );
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -133,8 +143,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void putInflight() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
@@ -153,7 +161,9 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- DatabaseQueueMessage message = new DatabaseQueueMessage(
+ try {
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
QakkaUtils.getTimeUuid(),
DatabaseQueueMessage.Type.DEFAULT,
queueName,
@@ -161,42 +171,46 @@ public class QueueActorHelperTest extends AbstractTest {
null,
System.currentTimeMillis(),
null,
- queueMessageId);
- qms.writeMessage( message );
+ queueMessageId );
+ qms.writeMessage( message );
- // put message inflight
+ // put message inflight
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- helper.putInflight( queueName, message );
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ helper.putInflight( queueName, message );
- // message must be gone from messages_available table
+ // message must be gone from messages_available table
- Assert.assertNull( qms.loadMessage(
+ Assert.assertNull( qms.loadMessage(
queueName,
actorSystemFig.getRegionLocal(),
null,
DatabaseQueueMessage.Type.DEFAULT,
message.getQueueMessageId() ) );
- // message must be present in messages_inflight table
+ // message must be present in messages_inflight table
- Assert.assertNotNull( qms.loadMessage(
+ Assert.assertNotNull( qms.loadMessage(
queueName,
actorSystemFig.getRegionLocal(),
null,
DatabaseQueueMessage.Type.INFLIGHT,
message.getQueueMessageId() ) );
- // there must be an audit log record of the successful get operation
+ // there must be an audit log record of the successful get operation
+
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+ Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get( 0 ).getAction() );
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 1, auditLogs.getEntities().size() );
- Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
- Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get(0).getAction() );
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -222,9 +236,11 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- // write message to messages_inflight table
+ try {
- DatabaseQueueMessage message = new DatabaseQueueMessage(
+ // write message to messages_inflight table
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
QakkaUtils.getTimeUuid(),
DatabaseQueueMessage.Type.INFLIGHT,
queueName,
@@ -232,34 +248,38 @@ public class QueueActorHelperTest extends AbstractTest {
null,
System.currentTimeMillis(),
null,
- queueMessageId);
- qms.writeMessage( message );
+ queueMessageId );
+ qms.writeMessage( message );
+
+ // ack message
- // ack message
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ helper.ackQueueMessage( queueName, message.getQueueMessageId() );
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+ // message must be gone from messages_available table
- // message must be gone from messages_available table
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ) );
- Assert.assertNull( helper.loadDatabaseQueueMessage(
- queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ));
+ // message must be gone from messages_inflight table
- // message must be gone from messages_inflight table
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ) );
- Assert.assertNull( helper.loadDatabaseQueueMessage(
- queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ));
+ // there should be an audit log record of the successful ack operation
- // there should be an audit log record of the successful ack operation
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+ Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get( 0 ).getAction() );
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 1, auditLogs.getEntities().size() );
- Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
- Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get(0).getAction() );
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -267,8 +287,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void ackQueueMessageNotFound() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -281,17 +299,23 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- // don't write message, just make up some bogus IDs
+ try {
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ // don't write message, just make up some bogus IDs
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ // ack message must fail
- // ack message must fail
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ Assert.assertEquals( DistributedQueueService.Status.NOT_INFLIGHT,
+ helper.ackQueueMessage( queueName, queueMessageId ) );
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST,
- helper.ackQueueMessage( queueName, queueMessageId ));
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
}
[07/10] usergrid git commit: Make DatabaseQueueMessage serializable
because it is sent in Akka messages
Posted by sn...@apache.org.
Make DatabaseQueueMessage serializable because it is sent in Akka messages
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/00eb1396
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/00eb1396
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/00eb1396
Branch: refs/heads/usergrid-1318-queue
Commit: 00eb13965e2b31bc1c57e437ceac0a42f6610861
Parents: c08e02a
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Sep 22 07:47:44 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Sep 22 07:47:44 2016 -0400
----------------------------------------------------------------------
.../qakka/serialization/queuemessages/DatabaseQueueMessage.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/00eb1396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
index dab47d5..89a3eeb 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
@@ -19,10 +19,11 @@
package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+import java.io.Serializable;
import java.util.UUID;
-public class DatabaseQueueMessage {
+public class DatabaseQueueMessage implements Serializable {
public enum Type {
DEFAULT, INFLIGHT