You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:06 UTC
[06/50] incubator-usergrid git commit: SQS buffer tests pass.
SQS buffer tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b3e42ddb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b3e42ddb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b3e42ddb
Branch: refs/heads/USERGRID-460
Commit: b3e42ddb3ca4182f675a650afe56688b32472e81
Parents: 8d8eb06
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 22:31:55 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 22:31:55 2015 -0600
----------------------------------------------------------------------
.../index/impl/BufferQueueSQSImplTest.java | 53 ++++--
.../queue/impl/SQSQueueManagerImpl.java | 176 ++++++++++---------
2 files changed, 130 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b3e42ddb/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
index 4a4672e..6922c15 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -26,26 +26,27 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
import com.google.inject.Inject;
import net.jcip.annotations.NotThreadSafe;
-import static org.junit.Assert.*;
-
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
@RunWith(EsRunner.class)
@@ -59,21 +60,23 @@ public class BufferQueueSQSImplTest {
public MigrationManagerRule migrationManagerRule;
@Inject
- private BufferQueueSQSImpl bufferQueueSQS;
+ public QueueManagerFactory queueManagerFactory;
+
+ @Inject
+ public IndexFig indexFig;
@Inject
- private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+ public MapManagerFactory mapManagerFactory;
+ @Inject
+ public MetricsFactory metricsFactory;
- @Before
- public void stop() {
- esIndexBufferConsumer.stop();
- }
+ private BufferQueueSQSImpl bufferQueueSQS;
- @After
- public void after() {
- esIndexBufferConsumer.start();
+ @Before
+ public void setup(){
+ bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory );
}
@@ -82,6 +85,10 @@ public class BufferQueueSQSImplTest {
@Test
public void testMessageIndexing(){
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null );
+ assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null );
+
final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}};
final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
@@ -112,9 +119,9 @@ public class BufferQueueSQSImplTest {
//now get it back
- final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS );
+ final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS );
- assertTrue(ops.size() > 1);
+ assertTrue(ops.size() > 0);
final IndexOperationMessage returnedOperation = ops.get( 0 );
@@ -139,6 +146,18 @@ public class BufferQueueSQSImplTest {
}
+ private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){
+ final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+ List<IndexOperationMessage> ops;
+
+ do{
+ ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS );
+ }while((ops == null || ops.size() == 0 ) && System.currentTimeMillis() < endTime);
+
+ return ops;
+ }
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b3e42ddb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index e2c5c1e..a78fc80 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -17,85 +17,106 @@
*/
package org.apache.usergrid.persistence.queue.impl;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.SDKGlobalConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+
+import com.amazonaws.AbortedException;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang.StringUtils;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
public class SQSQueueManagerImpl implements QueueManager {
private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
- private final AmazonSQSClient sqs;
+
private final QueueScope scope;
private ObjectMapper mapper;
+ private final QueueFig fig;
+ private final AmazonSQSClient sqs;
+
private static SmileFactory smileFactory = new SmileFactory();
- private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<SqsLoader, Queue>() {
- @Override
- public Queue load(SqsLoader queueLoader) throws Exception {
- Queue queue = null;
- try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
- queue = new Queue(result.getQueueUrl());
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- queue = null;
- } catch (Exception e) {
- LOG.error("failed to get queue from service", e);
- throw e;
- }
- if (queue == null) {
- String name = queueLoader.getKey();
- CreateQueueRequest createQueueRequest = new CreateQueueRequest()
- .withQueueName(name);
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
- String url = result.getQueueUrl();
- queue = new Queue(url);
- LOG.info("Created queue with url {}", url);
- }
- return queue;
- }
- }
- );
+ private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
+ .maximumSize( 1000 )
+ .build( new CacheLoader<String, Queue>() {
+ @Override
+ public Queue load( String queueLoader ) throws Exception {
+
+ //the amazon client is not thread safe, we need to create one per queue
+ Queue queue = null;
+ try {
+ GetQueueUrlResult result = sqs.getQueueUrl( queueLoader );
+ queue = new Queue( result.getQueueUrl() );
+ }catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+ //no op, swallow
+
+ }
+ catch ( Exception e ) {
+ LOG.error( "failed to get queue from service", e );
+ throw e;
+ }
+ if ( queue == null ) {
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueLoader );
+ CreateQueueResult result = sqs.createQueue( createQueueRequest );
+ String url = result.getQueueUrl();
+ queue = new Queue( url );
+ LOG.info( "Created queue with url {}", url );
+ }
+ return queue;
+ }
+ } );
+
@Inject
- public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+ public SQSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig ){
this.scope = scope;
+ this.fig = fig;
try {
- UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
- Regions regions = Regions.fromName(fig.getRegion());
- Region region = Region.getRegion(regions);
- sqs.setRegion(region);
+
smileFactory.delegateToTextual(true);
mapper = new ObjectMapper( smileFactory );
//pretty print, disabling for speed
// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+
+ sqs = createClient();
+
} catch ( Exception e ) {
throw new RuntimeException("Error setting up mapper", e);
}
@@ -109,7 +130,7 @@ public class SQSQueueManagerImpl implements QueueManager {
public Queue getQueue() {
try {
- Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+ Queue queue = urlMap.get(getName());
return queue;
} catch (ExecutionException ee) {
throw new RuntimeException(ee);
@@ -228,38 +249,29 @@ public class SQSQueueManagerImpl implements QueueManager {
return mapper.writeValueAsString(o);
}
- public class SqsLoader {
- private final String key;
-
- public SqsLoader(String key, AmazonSQSClient client) {
- this.key = key;
- }
-
- public String getKey() {
- return key;
- }
+ /**
+ * Get the region
+ * @return
+ */
+ private Region getRegion() {
+ Regions regions = Regions.fromName( fig.getRegion() );
+ Region region = Region.getRegion( regions );
+ return region;
+ }
- @Override
- public boolean equals(Object o){
- if(o instanceof SqsLoader){
- SqsLoader loader = (SqsLoader)o;
- return loader.getKey().equals(this.getKey());
- }
- return false;
- }
- @Override
- public int hashCode() {
- int result = getKey().hashCode();
- return result;
- }
+ /**
+ * Create the SQS client for the specified settings
+ */
+ private AmazonSQSClient createClient() {
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
+ final Region region = getRegion();
+ sqs.setRegion( region );
+ return sqs;
+ }
- @Override
- public String toString() {
- return getKey();
- }
- }
}