You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:37 UTC
[08/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
new file mode 100644
index 0000000..c154067
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class QueueMessageManagerTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerTest.class );
+
+ // TODO: test that multiple threads pulling from same queue will never pop same item
+
+ protected Injector myInjector = null;
+
+ @Override
+ protected Injector getInjector() {
+ if ( myInjector == null ) {
+ myInjector = Guice.createInjector( new QakkaModule() );
+ }
+ return myInjector;
+ }
+
+
+ @Test
+ public void testBasicOperation() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ // create queue and send one message to it
+ String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+ String jsonData = "{}";
+ qmm.sendMessages( queueName, Collections.singletonList(region), null, null,
+ "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) );
+
+ distributedQueueService.refresh();
+ Thread.sleep(1000);
+
+ // get message from the queue
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, messages.size() );
+ QueueMessage message = messages.get(0);
+
+ // test that queue message data is present and correct
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+ DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
+ Assert.assertNotNull( data );
+ Assert.assertEquals( "application/json", data.getContentType() );
+ String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") );
+ Assert.assertEquals( jsonData, jsonDataReturned );
+
+ // test that transfer log is empty for our queue
+ TransferLogSerialization tlogs = getInjector().getInstance( TransferLogSerialization.class );
+ Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+ List<TransferLog> logs = all.getEntities().stream()
+ .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+ Assert.assertTrue( logs.isEmpty() );
+
+ // ack the message
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
+
+ // test that message is no longer stored in non-replicated keyspace
+
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ));
+
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ));
+
+ // test that audit log entry was written
+ AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 3, auditLogs.getEntities().size() );
+ }
+
+
+ @Test
+ public void testQueueMessageTimeouts() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class );
+ QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ // create some number of queue messages
+
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class );
+ String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+ int numMessages = 40;
+
+ for ( int i=0; i<numMessages; i++ ) {
+ qmm.sendMessages(
+ queueName,
+ Collections.singletonList( region ),
+ null, // delay
+ null, // expiration
+ "application/json",
+ DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
+ }
+
+ distributedQueueService.refresh();
+ Thread.sleep(1000);
+
+ // get all messages from queue
+
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
+ Assert.assertEquals( numMessages, messages.size() );
+
+ // ack half of the messages
+
+ List<QueueMessage> remove = new ArrayList<>();
+
+ for ( int i=0; i<numMessages/2; i++ ) {
+ QueueMessage queueMessage = messages.get( i );
+ qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+ remove.add( queueMessage );
+ }
+
+ for ( QueueMessage message : remove ) {
+ messages.remove( message );
+ }
+
+ // wait for twice timeout period
+
+ Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+
+ distributedQueueService.processTimeouts();
+
+ Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+
+ // attempt to ack other half of messages
+
+ for ( QueueMessage message : messages ) {
+ try {
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
+ Assert.fail("Message should have timed out by now");
+
+ } catch ( QakkaRuntimeException expected ) {
+ // keep on going...
+ }
+ }
+ }
+
+
+ @Test
+ public void testGetWithMissingData() throws InterruptedException {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ DistributedQueueService qas = getInjector().getInstance( DistributedQueueService.class );
+ QueueManager qm = getInjector().getInstance( QueueManager.class );
+ QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class );
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ // create queue messages, every other one with missing data
+
+ int numMessages = 100;
+ String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+ for ( int i=0; i<numMessages; i++ ) {
+
+ final UUID messageId = QakkaUtils.getTimeUuid();
+
+ if ( i % 2 == 0 ) { // every other it
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+ qms.writeMessageData( messageId, messageBody );
+ }
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ messageId,
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
+ }
+
+ qas.refresh();
+ Thread.sleep(1000);
+
+ int count = 0;
+ while ( count < numMessages / 2 ) {
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+ Assert.assertTrue( !messages.isEmpty() );
+ count += messages.size();
+ logger.debug("Got {} messages", ++count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
new file mode 100644
index 0000000..829ba27
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+
+
+public class QueueActorServiceTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueActorServiceTest.class );
+
+ protected Injector myInjector = null;
+
+ @Override
+ protected Injector getInjector() {
+ if ( myInjector == null ) {
+ myInjector = Guice.createInjector( new QakkaModule() );
+ }
+ return myInjector;
+ }
+
+
+ @Test
+ public void testBasicOperation() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
+
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class );
+ QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class );
+
+ String queueName = "testqueue_" + UUID.randomUUID();
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+ // send 1 queue message, get back one queue message
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+ serialization.writeMessageData( messageId, messageBody );
+
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId, null, null);
+
+ distributedQueueService.refresh();
+ Thread.sleep(1000);
+
+ Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, qmReturned.size() );
+
+ DatabaseQueueMessage dqm = qmReturned.iterator().next();
+ DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
+ ByteBuffer blob = dqmb.getBlob();
+
+ String returnedData = new String( blob.array(), "UTF-8");
+// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
+// ObjectInputStream ios = new ObjectInputStream( bais );
+// String returnedData = (String)ios.readObject();
+
+ Assert.assertEquals( data, returnedData );
+
+ }
+
+
+ @Test
+ public void testGetMultipleQueueMessages() throws InterruptedException {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
+
+ App app = getInjector().getInstance( App.class );
+ app.start("localhost", getNextAkkaPort(), region);
+
+ DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class );
+ QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class );
+ InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class );
+
+ String queueName = "testqueue_" + UUID.randomUUID();
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ queueManager.createQueue(
+ new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+ for ( int i=0; i<100; i++ ) {
+
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+ serialization.writeMessageData( messageId, messageBody );
+
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId , null, null);
+ }
+
+ int maxRetries = 15;
+ int retries = 0;
+ while ( retries++ < maxRetries ) {
+ distributedQueueService.refresh();
+ Thread.sleep( 3000 );
+ if (inMemoryQueue.size( queueName ) == 100) {
+ break;
+ }
+ }
+
+ Assert.assertEquals( 100, inMemoryQueue.size( queueName ) );
+
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
new file mode 100644
index 0000000..9e4128e
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+
+public class QueueActorHelperTest extends AbstractTest {
+
+ protected Injector myInjector = null;
+
+ @Override
+ protected Injector getInjector() {
+ if ( myInjector == null ) {
+ myInjector = Guice.createInjector( new QakkaModule() );
+ }
+ return myInjector;
+ }
+
+
+ @Test
+ public void loadDatabaseQueueMessage() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ queueManager.createQueue( new Queue( queueName ) );
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ // write message
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
+
+ // load message
+
+ QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ queueName, message.getQueueMessageId(), message.getType() );
+
+ Assert.assertNotNull( queueMessage );
+ }
+
+
+ @Test
+ public void loadDatabaseQueueMessageNotFound() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ queueManager.createQueue( new Queue( queueName ) );
+
+ // don't write any message
+
+ // load message
+
+ QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT );
+
+ Assert.assertNull( queueMessage );
+ }
+
+
+ @Test
+ public void putInflight() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ // write message to messages_available table
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ queueManager.createQueue( new Queue( queueName ) );
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
+
+ // put message inflight
+
+ QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class );
+ helper.putInflight( queueName, message );
+
+ // message must be gone from messages_available table
+
+ Assert.assertNull( qms.loadMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.DEFAULT,
+ message.getQueueMessageId() ) );
+
+ // message must be present in messages_inflight table
+
+ Assert.assertNotNull( qms.loadMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.INFLIGHT,
+ message.getQueueMessageId() ) );
+
+ // there must be an audit log record of the successful get operation
+
+ AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
+ Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get(0).getAction() );
+ }
+
+
+ @Test
+ public void ackQueueMessage() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ queueManager.createQueue( new Queue( queueName ) );
+
+ // write message to messages_inflight table
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
+
+ // ack message
+
+ QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class );
+ helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+
+ // message must be gone from messages_available table
+
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ));
+
+ // message must be gone from messages_inflight table
+
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ));
+
+ // there should be an audit log record of the successful ack operation
+
+ AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
+ Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get(0).getAction() );
+ }
+
+
+ @Test
+ public void ackQueueMessageNotFound() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ queueManager.createQueue( new Queue( queueName ) );
+
+ // don't write message, just make up some bogus IDs
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ // ack message must fail
+
+ QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class );
+ Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST, helper.ackQueueMessage( queueName, queueMessageId ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
new file mode 100644
index 0000000..5f0be53
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueReaderTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueReaderTest.class );
+
+
+
+ @Test
+ public void testBasicOperation() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+
+ int numMessages = 200;
+ // create queue messages, only first lot get queue message data
+
+ QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class );
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( newShard );
+
+ for ( int i=0; i<numMessages; i++ ) {
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ messageId,
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ serialization.writeMessage( message );
+ }
+
+ InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class );
+ Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+
+ // run the QueueRefresher to fill up the in-memory queue
+
+ ActorSystem system = ActorSystem.create("Test-" + queueName);
+ ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
+ QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName );
+ queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
+
+ // need to wait for refresh to complete
+ int maxRetries = 10;
+ int retries = 0;
+ while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals( numMessages, inMemoryQueue.size( queueName ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
new file mode 100644
index 0000000..511b059
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueTimeouterTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueTimeouterTest.class );
+
+
+ @Test
+ public void testBasicOperation() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
+ ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+
+ // create records in inflight table, with some being old enough to time out
+
+ int numInflight = 200; // number of messages to be put into timeout table
+ int numTimedout = 75; // number of messages to be timedout
+
+ long timeoutMs = qakkaFig.getQueueTimeoutSeconds()*1000;
+
+ String queueName = "qtt_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+ Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( newShard );
+
+ newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( newShard );
+
+ for ( int i=0; i<numInflight; i++ ) {
+
+ long created = System.currentTimeMillis();
+ created = i < numTimedout ? created - timeoutMs: created + timeoutMs;
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ messageId,
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ created,
+ created,
+ queueMessageId );
+
+ qms.writeMessage( message );
+ }
+
+ List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT );
+ Assert.assertEquals( numInflight, inflightMessages.size() );
+
+ // run timeouter actor
+
+ ActorSystem system = ActorSystem.create("Test-" + queueName);
+ ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter");
+ QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
+ timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
+
+ Thread.sleep( timeoutMs );
+
+ // timed out messages should have been moved into available (DEFAULT) table
+
+ List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT);
+ Assert.assertEquals( numTimedout, queuedMessages.size() );
+
+ // and there should still be some messages in the INFLIGHT table
+
+ inflightMessages = getDatabaseQueueMessages(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT );
+ Assert.assertEquals( numInflight - numTimedout, inflightMessages.size() );
+
+ }
+
+ private List<DatabaseQueueMessage> getDatabaseQueueMessages(
+ CassandraClient cassandraClient, String queueName, String region, Shard.Type type ) {
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, region, type, Optional.empty() );
+
+ DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( type ) ?
+ DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT;
+
+ MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, dbqmType, shardIterator, null);
+
+ List<DatabaseQueueMessage> inflightMessages = new ArrayList<>(2000);
+ while ( multiShardIterator.hasNext() && inflightMessages.size() < 2000 ) {
+ DatabaseQueueMessage message = multiShardIterator.next();
+ inflightMessages.add( message );
+ }
+ return inflightMessages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
new file mode 100644
index 0000000..3dbd980
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Optional;
+
+
+public class ShardAllocatorTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueReaderTest.class );
+
+
+ protected Injector myInjector = null;
+
+ @Override
+ protected Injector getInjector() {
+ if ( myInjector == null ) {
+ myInjector = Guice.createInjector( new QakkaModule() );
+ }
+ return myInjector;
+ }
+
+
+ @Test
+ public void testBasicOperation() throws InterruptedException {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class );
+ QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ ShardCounterSerialization shardCounterSer = getInjector().getInstance( ShardCounterSerialization.class );
+
+ String rando = RandomStringUtils.randomAlphanumeric( 20 );
+
+ String queueName = "queue_" + rando;
+ String region = actorSystemFig.getRegionLocal();
+
+ // Create a set of shards, each with max count
+
+ Shard lastShard = null;
+
+ int numShards = 4;
+ long maxPerShard = qakkaFig.getMaxShardSize();
+
+ for ( long shardId = 1; shardId < numShards + 1; shardId++ ) {
+
+ Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, shardId, QakkaUtils.getTimeUuid());
+ shardSer.createShard( shard );
+
+ if ( shardId != numShards ) {
+ shardCounterSer.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, maxPerShard );
+
+ } else {
+ // Create last shard with %20 less than max
+ shardCounterSer.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, (long)(0.8 * maxPerShard) );
+ lastShard = shard;
+ }
+
+ Thread.sleep( 10 );
+ }
+
+ Assert.assertEquals( numShards, countShards(
+ cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ));
+
+ // Run shard allocator actor by sending message to it
+
+ ActorSystem system = ActorSystem.create("Test-" + queueName);
+ ActorRef shardAllocRef = system.actorOf( Props.create( ShardAllocator.class, queueName ), "shardallocator");
+
+ ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
+ shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately
+ Thread.sleep(1000);
+
+ // Test that no new shards created
+
+ Assert.assertEquals( numShards, countShards(
+ cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ));
+
+ // Increment last shard by 20% of max
+
+ shardCounterSer.incrementCounter(
+ queueName, Shard.Type.DEFAULT, lastShard.getShardId(), (long)(0.2 * maxPerShard) );
+
+ // Run shard allocator again
+
+ shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately
+ Thread.sleep(1000);
+
+ // Test that, this time, a new shard was created
+
+ Assert.assertEquals( numShards + 1, countShards(
+ cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ));
+ }
+
+
+ int countShards(
+ CassandraClient cassandraClient,
+ ShardCounterSerialization scs,
+ String queueName,
+ String region,
+ Shard.Type type ) {
+
+ ShardIterator shardIterator =
+ new ShardIterator( cassandraClient, queueName, region, type, Optional.empty() );
+
+ int shardCount = 0;
+ while ( shardIterator.hasNext() ) {
+ Shard s = shardIterator.next();
+ shardCount++;
+ long counterValue = scs.getCounterValue( s.getQueueName(), type, s.getShardId() );
+ logger.debug("Shard {} {} is #{} has count={}", type, s.getShardId(), shardCount, counterValue );
+
+ }
+
+ return shardCount;
+ }
+
+
+ @Test
+ public void testBasicOperationWithMessages() throws InterruptedException {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ getInjector().getInstance( App.class ); // init the INJECTOR
+
+ ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
+ QueueManager queueManager = getInjector().getInstance( QueueManager.class );
+ QueueMessageManager queueMessageManager = getInjector().getInstance( QueueMessageManager.class );
+ DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class );
+ ShardCounterSerialization shardCounterSer = getInjector().getInstance( ShardCounterSerialization.class );
+
+
+ String region = actorSystemFig.getRegionLocal();
+ App app = getInjector().getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ String rando = RandomStringUtils.randomAlphanumeric( 20 );
+ String queueName = "queue_" + rando;
+
+ queueManager.createQueue( new Queue( queueName ));
+
+ // Create 4000 messages
+
+ int numMessages = 4000;
+
+ for ( int i=0; i<numMessages; i++ ) {
+ queueMessageManager.sendMessages(
+ queueName,
+ Collections.singletonList( region ),
+ null, // delay
+ null, // expiration
+ "application/json",
+ DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
+ }
+
+ distributedQueueService.refresh();
+ Thread.sleep(3000);
+
+ // Test that 8 shards were created
+
+ Assert.assertEquals( 8,
+ countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
new file mode 100644
index 0000000..76e3279
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( MultiShardDatabaseQueueMessageIteratorTest.class );
+
+
+ @Test
+ public void testIterator() throws InterruptedException {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null);
+ Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null);
+ Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null);
+ Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, null);
+
+ shardSerialization.createShard(shard1);
+ shardSerialization.createShard(shard2);
+ shardSerialization.createShard(shard3);
+ shardSerialization.createShard(shard4);
+
+ final int numMessagesPerShard = 50;
+
+ // just do these separately to space out the time UUIDs per shard
+ for(int i=0; i < numMessagesPerShard; i++){
+
+ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(),
+ System.currentTimeMillis(), null, null));
+ Thread.sleep(3);
+ }
+
+ for(int i=0; i < numMessagesPerShard; i++){
+
+ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(),
+ System.currentTimeMillis(), null, null));
+ Thread.sleep(3);
+ }
+
+ for(int i=0; i < numMessagesPerShard; i++){
+
+ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(),
+ System.currentTimeMillis(), null, null));
+ Thread.sleep(3);
+ }
+
+ for(int i=0; i < numMessagesPerShard; i++){
+
+ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(),
+ System.currentTimeMillis(), null, null));
+ Thread.sleep(3);
+ }
+
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty());
+ MultiShardMessageIterator iterator = new MultiShardMessageIterator(
+ cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+
+ final AtomicInteger[] counts = {
+ new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) };
+
+ iterator.forEachRemaining(message -> {
+ //logger.info("Shard ID: {}, DatabaseQueueMessage ID: {}", message.getShardId(), message.getMessageId());
+ counts[ (int)(message.getShardId() - 1) ] .incrementAndGet();
+ });
+
+ logger.info("Total Count 1: {}", counts[0].get());
+ logger.info("Total Count 2: {}", counts[1].get());
+ logger.info("Total Count 3: {}", counts[2].get());
+ logger.info("Total Count 4: {}", counts[3].get());
+
+ assertEquals(numMessagesPerShard, counts[0].get());
+ assertEquals(numMessagesPerShard, counts[1].get());
+ assertEquals(numMessagesPerShard, counts[2].get());
+ assertEquals(numMessagesPerShard, counts[3].get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
new file mode 100644
index 0000000..072fd94
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlogs;
+
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+
+public class AuditLogSerializationTest extends AbstractTest {
+
+ @Test
+ public void testRecordAuditLog() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class );
+
+ // record some audit logs for a message
+ UUID messageId = UUIDGen.getTimeUUID();
+ String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+ String source = RandomStringUtils.randomAlphanumeric( 15 );
+ String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+ logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
+ queueName, dest, messageId, UUIDGen.getTimeUUID() );
+
+ // get audit logs for that message
+ Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
+ Assert.assertEquals( 1, result.getEntities().size() );
+ }
+
+ @Test
+ public void testGetAuditLogs() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class );
+
+ // record some audit logs for a message
+ UUID messageId = UUIDGen.getTimeUUID();
+ String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+ String source = RandomStringUtils.randomAlphanumeric( 15 );
+ String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+ int numLogs = 10;
+
+ UUID queueMessageId1 = UUIDGen.getTimeUUID();
+ for ( int i=0; i<numLogs; i++ ) {
+ logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
+ queueName, dest, messageId, queueMessageId1 );
+ Thread.sleep(5);
+ }
+
+ UUID queueMessageId2 = UUIDGen.getTimeUUID();
+ for ( int i=0; i<numLogs; i++ ) {
+ logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
+ queueName, dest, messageId, queueMessageId2 );
+ Thread.sleep(5);
+ }
+
+ UUID queueMessageId3 = UUIDGen.getTimeUUID();
+ for ( int i=0; i<numLogs; i++ ) {
+ logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS,
+ queueName, dest, messageId, queueMessageId3 );
+ Thread.sleep(5);
+ }
+
+ // test that we have 3 X number of logs for the messageId
+ Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
+ Assert.assertEquals( numLogs * 3, result.getEntities().size() );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
new file mode 100644
index 0000000..4ea6de3
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+public class DatabaseQueueMessageSerializationTest extends AbstractTest {
+
+
+ static class ThingToSave implements Serializable {
+ String value;
+ }
+
+
+ @Test
+ public void writeNewMessage(){
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ DatabaseQueueMessage message1 = new DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT, "test", "region1",
+ shard1.getShardId(), System.currentTimeMillis(), null, null);
+
+ UUID queueMessageId = queueMessageSerialization.writeMessage(message1);
+ }
+
+ @Test
+ public void deleteMessage(){
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+ String queueName = "dqmst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ messageId,
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ "dummy_region",
+ shard1.getShardId(),
+ System.currentTimeMillis(),
+ null, null );
+
+ UUID queueMessageId = queueMessageSerialization.writeMessage( message );
+
+ queueMessageSerialization.deleteMessage(
+ queueName,
+ "dummy_region",
+ shard1.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueMessageId );
+
+ assertNull( queueMessageSerialization.loadMessage(
+ queueName,
+ "dummy_region",
+ shard1.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueMessageId
+ ));
+ }
+
+
+ @Test
+ public void loadNullMessage(){
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ assertNull( queueMessageSerialization.loadMessage(
+ RandomStringUtils.randomAlphanumeric( 20 ),
+ "dummy_region",
+ shard1.getShardId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ null
+ ));
+ }
+
+
+ @Test
+ public void writeNewMessageData(){
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+
+ final String data = "my test data";
+
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ DataType.serializeValue(data, ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+ queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+ final DatabaseQueueMessageBody returnedData = queueMessageSerialization.loadMessageData( messageId );
+ }
+
+
+ @Test
+ public void loadMessageData() throws Exception {
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+
+ final String data = "my test data";
+
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
+ ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+ queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+ final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
+ String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
+
+ assertEquals(data, returnedData);
+ }
+
+
+ @Test
+ public void loadMessageObjectData() throws Exception {
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+
+ final String data = "my test data";
+
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
+ ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+ queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+ final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
+ String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
+
+ assertEquals(data, returnedData);
+ }
+
+
+
+
+ @Test
+ public void deleteMessageData() throws UnsupportedEncodingException {
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+
+ final String data = "my test data";
+
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
+ ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+ queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+ final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
+ final String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
+
+ assertEquals(data, returnedData);
+
+ queueMessageSerialization.deleteMessageData(messageId);
+
+ assertNull(queueMessageSerialization.loadMessageData( messageId ));
+
+
+ }
+
+
+ /**
+ * Persist to blob using Java serialization.
+ */
+ @Test
+ public void persistJavaObjectData() throws Exception {
+
+ QueueMessageSerialization queueMessageSerialization =
+ getInjector().getInstance( QueueMessageSerialization.class );
+
+ // serialize Java object to byte buffer
+
+ final ThingToSave data = new ThingToSave();
+ data.value = "my test data";
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(data);
+ oos.flush();
+ oos.close();
+ ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() );
+
+ // write to Cassandra
+
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ byteBuffer,"application/octet-stream");
+
+ UUID messageId = QakkaUtils.getTimeUuid();
+ queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+ // load from Cassandra
+
+ final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
+
+ // deserialize byte buffer
+
+ ByteBuffer messageData = returnedBody.getBlob();
+ ByteArrayInputStream bais = new ByteArrayInputStream( messageData.array() );
+
+ // throws exception -> java.io.StreamCorruptedException: invalid stream header: 00000000
+ ObjectInputStream ios = new ObjectInputStream( bais );
+ ThingToSave returnedData = (ThingToSave)ios.readObject();
+
+ assertEquals( data.value, returnedData.value );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
new file mode 100644
index 0000000..4690a1a
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class DatabaseQueueSerializationTest extends AbstractTest {
+
+ @Test
+ public void writeQueue(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+ QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
+
+ DatabaseQueue queue = new DatabaseQueue("test", "west", "west", 0L, 0, 0, "test_dlq");
+
+ queueSerialization.writeQueue(queue);
+
+ }
+
+ @Test
+ public void loadQueue(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+ QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
+
+ DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq");
+
+ queueSerialization.writeQueue(queue);
+ DatabaseQueue returnedQueue = queueSerialization.getQueue("test1");
+
+ assertEquals(queue, returnedQueue);
+
+ }
+
+ @Test
+ public void deleteQueue(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+ QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class );
+
+ DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq");
+
+ queueSerialization.writeQueue(queue);
+ DatabaseQueue returnedQueue = queueSerialization.getQueue("test1");
+
+ assertEquals(queue, returnedQueue);
+
+ queueSerialization.deleteQueue(queue.getName());
+
+ assertNull(queueSerialization.getQueue("test1"));
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
new file mode 100644
index 0000000..3152025
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+public class ShardCounterSerializationTest extends AbstractTest {
+
+
+ @Test
+ public void testBasicOperation() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
+
+ String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+ long shardId = 100L;
+
+ try {
+ scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId );
+ fail("Should have throw NotFoundException");
+ } catch ( NotFoundException expected ) {
+ // pass
+ }
+
+ scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 10 );
+ Assert.assertEquals( 10, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+
+ scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 50 );
+ Assert.assertEquals( 60, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+
+ scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 150 );
+ Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+ }
+
+}
\ No newline at end of file