You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:47 UTC
[18/25] usergrid git commit: switch Qakka to using two keyspaces,
the original replicated Applications one and a new un-replicated
Applications Local one
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
index 5bb06fd..9ebb841 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
@@ -25,6 +25,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -44,6 +45,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class );
private final CassandraClient cassandraClient;
+ private final CassandraFig cassandraFig;
public final static String TABLE_TRANSFER_LOG = "transfer_log";
@@ -65,7 +67,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
@Inject
- public TransferLogSerializationImpl( CassandraClient cassandraClient ) {
+ public TransferLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
+ this.cassandraFig = cassandraFig;
this.cassandraClient = cassandraClient;
}
@@ -80,7 +83,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
.value(COLUMN_DEST_REGION, dest )
.value(COLUMN_MESSAGE_ID, messageId )
.value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
- cassandraClient.getSession().execute(insert);
+ cassandraClient.getApplicationSession().execute(insert);
// logger.debug("Recorded transfer log for queue {} dest {} messageId {}",
// queueName, dest, messageId);
@@ -95,7 +98,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
.and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
.and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
- ResultSet rs = cassandraClient.getSession().execute( query );
+ ResultSet rs = cassandraClient.getApplicationSession().execute( query );
if ( rs.getAvailableWithoutFetching() == 0 ) {
StringBuilder sb = new StringBuilder();
@@ -109,7 +112,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
.and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
.and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
- cassandraClient.getSession().execute( deleteQuery );
+ cassandraClient.getApplicationSession().execute( deleteQuery );
}
@@ -123,7 +126,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
query.setPagingState( pagingState );
}
- ResultSet rs = cassandraClient.getSession().execute( query );
+ ResultSet rs = cassandraClient.getApplicationSession().execute( query );
final PagingState newPagingState = rs.getExecutionInfo().getPagingState();
final List<TransferLog> transferLogs = new ArrayList<>();
@@ -160,7 +163,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
@Override
public Collection<TableDefinition> getTables() {
- return Collections.singletonList( new TableDefinitionStringImpl( TABLE_TRANSFER_LOG, CQL ) );
+ return Collections.singletonList(
+ new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
index 4c3e480..6f1c744 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
@@ -29,6 +29,7 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,11 +40,13 @@ public class AbstractTest {
protected static Injector sharedInjector;
+ AtomicBoolean migrated = new AtomicBoolean( false );
+
static { new KeyspaceDropper(); }
public AbstractTest() {
- if ( getInjector() == null ) {
+ if ( !migrated.getAndSet( true ) ) {
setInjector( Guice.createInjector( new TestModule() ) );
MigrationManager migrationManager = getInjector().getInstance( MigrationManager.class );
try {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
index aa4dfd1..e220650 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
@@ -33,13 +33,13 @@ import java.util.Properties;
* Created by Dave Johnson (snoopdave@apache.org) on 9/9/16.
*/
public class KeyspaceDropper {
-
+
private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
-
- static { dropTestKeyspace(); }
-
- public static void dropTestKeyspace() {
+ static { dropTestKeyspaces(); }
+
+
+ public static void dropTestKeyspaces() {
String propsFileName = "qakka.properties";
@@ -50,10 +50,17 @@ public class KeyspaceDropper {
throw new RuntimeException( "Unable to load " + propsFileName + " file!" );
}
- String keyspace = (String)props.get("cassandra.keyspace.application");
+ String keyspaceApp = (String)props.get("cassandra.keyspace.application");
+ String keyspaceQueue = (String)props.get("cassandra.keyspace.queue-message");
String hosts[] = props.getProperty( "cassandra.hosts", "127.0.0.1" ).split(",");
int port = Integer.parseInt( props.getProperty( "cassandra.port", "9042" ));
+ dropTestKeyspace( keyspaceApp, hosts, port );
+ dropTestKeyspace( keyspaceQueue, hosts, port );
+ }
+
+ public static void dropTestKeyspace( String keyspace, String[] hosts, int port ) {
+
Cluster.Builder builder = Cluster.builder();
for ( String host : hosts ) {
builder = builder.addContactPoint( host ).withPort( port );
@@ -67,4 +74,5 @@ public class KeyspaceDropper {
logger.info("Dropping test keyspace: {}", keyspace);
session.execute( "DROP KEYSPACE IF EXISTS " + keyspace );
}
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
index 42423fa..e1f0c7e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
@@ -30,13 +30,13 @@ import org.junit.Test;
* Created by russo on 6/8/16.
*/
public class CassandraClientTest extends AbstractTest {
-
+
@Test
public void getClient(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- Session session = cassandraClient.getSession();
+ Session session = cassandraClient.getApplicationSession();
session.getLoggedKeyspace();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 5a0feba..630c953 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -26,14 +26,13 @@ import com.google.inject.Injector;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
-import org.apache.usergrid.persistence.qakka.serialization.Result;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -42,8 +41,8 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMe
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.apache.usergrid.persistence.queue.TestModule;
-import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +73,6 @@ public class QueueMessageManagerTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -136,12 +134,12 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
+ @Ignore
public void testQueueMessageTimeouts() throws Exception {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
@@ -225,12 +223,12 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
+ @Ignore
public void testGetWithMissingData() throws InterruptedException {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 182d5d6..a46c186 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -66,7 +66,6 @@ public class QueueActorServiceTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -119,7 +118,6 @@ public class QueueActorServiceTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 77c11e4..3bf352f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -21,26 +21,26 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.QakkaModule;
import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.serialization.Result;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.queue.TestModule;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import java.util.UUID;
+@NotThreadSafe
public class QueueActorHelperTest extends AbstractTest {
@@ -54,7 +54,6 @@ public class QueueActorHelperTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
@@ -102,7 +101,6 @@ public class QueueActorHelperTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
@@ -136,7 +134,6 @@ public class QueueActorHelperTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
@@ -208,7 +205,6 @@ public class QueueActorHelperTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
@@ -272,7 +268,6 @@ public class QueueActorHelperTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index b803f7e..0b8b795 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -53,7 +53,6 @@ public class QueueReaderTest extends AbstractTest {
public void testBasicOperation() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
getInjector().getInstance( App.class ); // init the INJECTOR
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index 511b059..54f9d42 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,12 +52,12 @@ import java.util.UUID;
public class QueueTimeouterTest extends AbstractTest {
private static final Logger logger = LoggerFactory.getLogger( QueueTimeouterTest.class );
-
+
@Test
+ @Ignore
public void testBasicOperation() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
getInjector().getInstance( App.class ); // init the INJECTOR
@@ -64,12 +65,12 @@ public class QueueTimeouterTest extends AbstractTest {
ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
-
- // create records in inflight table, with some being old enough to time out
+
+ // create records in inflight table, with some being old enough to time out
int numInflight = 200; // number of messages to be put into timeout table
int numTimedout = 75; // number of messages to be timedout
-
+
long timeoutMs = qakkaFig.getQueueTimeoutSeconds()*1000;
String queueName = "qtt_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
@@ -83,12 +84,12 @@ public class QueueTimeouterTest extends AbstractTest {
shardSerialization.createShard( newShard );
for ( int i=0; i<numInflight; i++ ) {
-
+
long created = System.currentTimeMillis();
created = i < numTimedout ? created - timeoutMs: created + timeoutMs;
UUID queueMessageId = QakkaUtils.getTimeUuid();
-
+
UUID messageId = QakkaUtils.getTimeUuid();
DatabaseQueueMessage message = new DatabaseQueueMessage(
messageId,
@@ -99,32 +100,32 @@ public class QueueTimeouterTest extends AbstractTest {
created,
created,
queueMessageId );
-
+
qms.writeMessage( message );
}
- List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages(
+ List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages(
cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT );
Assert.assertEquals( numInflight, inflightMessages.size() );
-
+
// run timeouter actor
ActorSystem system = ActorSystem.create("Test-" + queueName);
ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter");
QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
-
+
Thread.sleep( timeoutMs );
// timed out messages should have been moved into available (DEFAULT) table
-
- List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages(
+
+ List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages(
cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT);
Assert.assertEquals( numTimedout, queuedMessages.size() );
// and there should still be some messages in the INFLIGHT table
- inflightMessages = getDatabaseQueueMessages(
+ inflightMessages = getDatabaseQueueMessages(
cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT );
Assert.assertEquals( numInflight - numTimedout, inflightMessages.size() );
@@ -132,13 +133,13 @@ public class QueueTimeouterTest extends AbstractTest {
private List<DatabaseQueueMessage> getDatabaseQueueMessages(
CassandraClient cassandraClient, String queueName, String region, Shard.Type type ) {
-
+
ShardIterator shardIterator = new ShardIterator(
cassandraClient, queueName, region, type, Optional.empty() );
DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( type ) ?
DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT;
-
+
MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
cassandraClient, queueName, region, dbqmType, shardIterator, null);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index d486c80..ae62c89 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
import org.apache.usergrid.persistence.queue.TestModule;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,6 @@ public class ShardAllocatorTest extends AbstractTest {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
@@ -163,12 +163,12 @@ public class ShardAllocatorTest extends AbstractTest {
@Test
+ @Ignore
public void testBasicOperationWithMessages() throws InterruptedException {
Injector injector = getInjector();
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
injector.getInstance( App.class ); // init the INJECTOR
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
index 76e3279..2d8da6d 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.qakka.serialization;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -44,17 +45,17 @@ import static org.junit.Assert.assertEquals;
public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
private static final Logger logger = LoggerFactory.getLogger( MultiShardDatabaseQueueMessageIteratorTest.class );
-
+
@Test
public void testIterator() throws InterruptedException {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
-
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
-
+
Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null);
Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null);
Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null);
@@ -71,7 +72,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -79,7 +80,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -87,7 +88,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -95,7 +96,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
for(int i=0; i < numMessagesPerShard; i++){
queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(),
System.currentTimeMillis(), null, null));
Thread.sleep(3);
}
@@ -103,12 +104,12 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
ShardIterator shardIterator = new ShardIterator(
cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty());
- MultiShardMessageIterator iterator = new MultiShardMessageIterator(
+ MultiShardMessageIterator iterator = new MultiShardMessageIterator(
cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
- final AtomicInteger[] counts = {
+ final AtomicInteger[] counts = {
new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) };
-
+
iterator.forEachRemaining(message -> {
//logger.info("Shard ID: {}, DatabaseQueueMessage ID: {}", message.getShardId(), message.getMessageId());
counts[ (int)(message.getShardId() - 1) ] .incrementAndGet();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
index 072fd94..4d60772 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
@@ -39,7 +39,6 @@ public class AuditLogSerializationTest extends AbstractTest {
public void testRecordAuditLog() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class );
@@ -48,10 +47,10 @@ public class AuditLogSerializationTest extends AbstractTest {
String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
String source = RandomStringUtils.randomAlphanumeric( 15 );
String dest = RandomStringUtils.randomAlphanumeric( 15 );
-
- logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
+
+ logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
queueName, dest, messageId, UUIDGen.getTimeUUID() );
-
+
// get audit logs for that message
Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
Assert.assertEquals( 1, result.getEntities().size() );
@@ -61,8 +60,6 @@ public class AuditLogSerializationTest extends AbstractTest {
public void testGetAuditLogs() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
-
AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class );
@@ -73,14 +70,14 @@ public class AuditLogSerializationTest extends AbstractTest {
String dest = RandomStringUtils.randomAlphanumeric( 15 );
int numLogs = 10;
-
+
UUID queueMessageId1 = UUIDGen.getTimeUUID();
for ( int i=0; i<numLogs; i++ ) {
logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
queueName, dest, messageId, queueMessageId1 );
- Thread.sleep(5);
+ Thread.sleep(5);
}
-
+
UUID queueMessageId2 = UUIDGen.getTimeUUID();
for ( int i=0; i<numLogs; i++ ) {
logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
@@ -99,4 +96,4 @@ public class AuditLogSerializationTest extends AbstractTest {
Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
Assert.assertEquals( numLogs * 3, result.getEntities().size() );
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
index e50bae5..2100c80 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
@@ -36,7 +36,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest {
public void writeQueue(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
DatabaseQueue queue = new DatabaseQueue("test", "west", "west", 0L, 0, 0, "test_dlq");
@@ -51,7 +50,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest {
public void loadQueue(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq");
@@ -68,7 +66,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest {
public void deleteQueue(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index 3152025..f9c2951 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -32,34 +32,33 @@ import static org.junit.Assert.fail;
public class ShardCounterSerializationTest extends AbstractTest {
-
+
@Test
public void testBasicOperation() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
- ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
-
+ ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
+
String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
long shardId = 100L;
-
+
try {
scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId );
fail("Should have throw NotFoundException");
} catch ( NotFoundException expected ) {
- // pass
+ // pass
}
scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 10 );
Assert.assertEquals( 10, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
-
+
scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 50 );
Assert.assertEquals( 60, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
-
+
scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 150 );
Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
index 572c897..fb0a46e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
@@ -46,7 +47,8 @@ public class ShardIteratorTest extends AbstractTest {
public void getActiveShards(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
@@ -76,8 +78,9 @@ public class ShardIteratorTest extends AbstractTest {
public void seekActiveShards(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
-
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null);
@@ -107,21 +110,22 @@ public class ShardIteratorTest extends AbstractTest {
public void shardIteratorOrdering() throws Exception {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
int numShards = 10;
String region = "default";
String queueName = "sit_queue_" + RandomStringUtils.randomAlphanumeric(20);
-
+
for ( long i=0; i<numShards; i++) {
UUID messageId = QakkaUtils.getTimeUuid();
Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, i+1, messageId );
shardSerialization.createShard( shard );
try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
}
-
+
Iterator<Shard> shardIterator = new ShardIterator(
- cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty());
+ cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty());
int count = 0;
Long prevTimestamp = null;
@@ -133,7 +137,7 @@ public class ShardIteratorTest extends AbstractTest {
prevTimestamp = shard.getPointer().timestamp();
count++;
}
-
+
Assert.assertEquals( numShards, count );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
index e1a541b..e67db28 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -46,7 +47,8 @@ public class ShardSerializationTest extends AbstractTest {
public void writeNewShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
shardSerialization.createShard(shard1);
@@ -56,7 +58,8 @@ public class ShardSerializationTest extends AbstractTest {
public void deleteShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
@@ -72,7 +75,8 @@ public class ShardSerializationTest extends AbstractTest {
public void loadNullShard(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
@@ -86,8 +90,9 @@ public class ShardSerializationTest extends AbstractTest {
public void updatePointer(){
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
-
+ CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
shardSerialization.createShard(shard1);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
index ea73abc..7338a42 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
@@ -41,7 +41,6 @@ public class ShardStrategyTest extends AbstractTest {
public void testBasicOperation() {
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class );
@@ -49,7 +48,7 @@ public class ShardStrategyTest extends AbstractTest {
UUID messageIdToLocate = null;
long selectedShardId = 4L;
-
+
int numShards = 10;
String region = "default";
String queueName = "sst_queue_" + RandomStringUtils.randomAlphanumeric(20);
@@ -66,6 +65,6 @@ public class ShardStrategyTest extends AbstractTest {
Shard selectedShard = shardStrategy.selectShard( queueName, region, Shard.Type.DEFAULT, messageIdToLocate );
Assert.assertEquals( selectedShardId, selectedShard.getShardId() );
-
+
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
index 20b72b0..306dfee 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
@@ -42,16 +42,15 @@ public class TransferLogSerializationTest extends AbstractTest {
public void recordTransferLog() throws Exception {
TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
-
+
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
String source = RandomStringUtils.randomAlphanumeric( 15 );
String dest = RandomStringUtils.randomAlphanumeric( 15 );
-
+
int numLogs = 100;
-
+
for ( int i=0; i<numLogs; i++ ) {
logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID());
}
@@ -60,9 +59,9 @@ public class TransferLogSerializationTest extends AbstractTest {
int fetchCount = 0;
PagingState pagingState = null;
while ( true ) {
-
+
Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 );
-
+
// we only want entities for our queue
List<TransferLog> logs = all.getEntities().stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
@@ -71,7 +70,7 @@ public class TransferLogSerializationTest extends AbstractTest {
fetchCount++;
if ( all.getPagingState() == null ) {
break;
- }
+ }
pagingState = all.getPagingState();
}
@@ -84,8 +83,7 @@ public class TransferLogSerializationTest extends AbstractTest {
TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
-
+
String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
String source = RandomStringUtils.randomAlphanumeric( 15 );
String dest = RandomStringUtils.randomAlphanumeric( 15 );
@@ -101,16 +99,16 @@ public class TransferLogSerializationTest extends AbstractTest {
Assert.assertEquals( 1, logs.size());
logSerialization.removeTransferLog( queueName, source, dest, messageId );
-
+
List<TransferLog> all = getTransferLogs( logSerialization );
logs = all.stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
Assert.assertEquals( 0, logs.size());
-
+
try {
logSerialization.removeTransferLog( queueName, source, dest, messageId );
Assert.fail("Removing non-existent log should throw exception");
-
+
} catch ( QakkaException expected ) {
// success!
}
@@ -130,4 +128,4 @@ public class TransferLogSerializationTest extends AbstractTest {
return allLogs;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index bc01b23..5800bba 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -25,11 +25,9 @@ import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.QakkaModule;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
-import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.junit.Ignore;
import org.junit.Test;
@@ -60,7 +58,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
Injector myInjector = getInjector();
CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -99,7 +96,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
Injector myInjector = getInjector();
CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
@@ -142,7 +138,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
Injector myInjector = getInjector();
CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
- cassandraClient.getSession();
ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index c62b0df..9140637 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -34,19 +34,25 @@ usergrid.cluster.seeds=us-east:localhost
# Port used for cluster communications.
usergrid.cluster.port=2551
-queue.sender.num.actors=10
-queue.writer.num.actors=10
-queue.num.actors=10
+queue.sender.num.actors=20
+queue.writer.num.actors=20
+queue.num.actors=20
# set shard size and times low for testing purposes
queue.shard.max.size=500
-queue.shard.allocation.check.frequency.millis=100
-queue.shard.allocation.advance.time.millis=200
+queue.shard.allocation.check.frequency.millis=1000
+queue.shard.allocation.advance.time.millis=2000
+queue.refresh.millis=1000
queue.max.inmemory.shard.counter = 100
+cassandra.connections=10
+#cassandra.timeout=20000
+
cassandra.hosts=localhost
-cassandra.keyspace.application=qakka_test
+cassandra.keyspace.application=qakka_test_application
+
+cassandra.keyspace.queue-message=qakka_test_queue_messages
cassandra.keyspace-drop-and-create=true