You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:06 UTC

[06/50] incubator-usergrid git commit: SQS buffer tests pass.

SQS buffer tests pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b3e42ddb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b3e42ddb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b3e42ddb

Branch: refs/heads/USERGRID-460
Commit: b3e42ddb3ca4182f675a650afe56688b32472e81
Parents: 8d8eb06
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 22:31:55 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 22:31:55 2015 -0600

----------------------------------------------------------------------
 .../index/impl/BufferQueueSQSImplTest.java      |  53 ++++--
 .../queue/impl/SQSQueueManagerImpl.java         | 176 ++++++++++---------
 2 files changed, 130 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b3e42ddb/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
index 4a4672e..6922c15 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -26,26 +26,27 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
 
 import com.google.inject.Inject;
 
 import net.jcip.annotations.NotThreadSafe;
 
-import static org.junit.Assert.*;
-
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 
 @RunWith(EsRunner.class)
@@ -59,21 +60,23 @@ public class BufferQueueSQSImplTest {
     public MigrationManagerRule migrationManagerRule;
 
     @Inject
-    private BufferQueueSQSImpl bufferQueueSQS;
+    public QueueManagerFactory queueManagerFactory;
+
+    @Inject
+    public IndexFig indexFig;
 
     @Inject
-    private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+    public MapManagerFactory mapManagerFactory;
 
+    @Inject
+    public MetricsFactory metricsFactory;
 
-    @Before
-    public void stop() {
-        esIndexBufferConsumer.stop();
-    }
 
+    private BufferQueueSQSImpl bufferQueueSQS;
 
-    @After
-    public void after() {
-        esIndexBufferConsumer.start();
+    @Before
+    public void setup(){
+        bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory );
     }
 
 
@@ -82,6 +85,10 @@ public class BufferQueueSQSImplTest {
     @Test
     public void testMessageIndexing(){
 
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null );
+        assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null );
+
         final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
         final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
 
@@ -112,9 +119,9 @@ public class BufferQueueSQSImplTest {
 
         //now get it back
 
-        final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+        final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS );
 
-        assertTrue(ops.size() > 1);
+        assertTrue(ops.size() > 0);
 
         final IndexOperationMessage returnedOperation = ops.get( 0 );
 
@@ -139,6 +146,18 @@ public class BufferQueueSQSImplTest {
 
     }
 
+    private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){
+        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+        List<IndexOperationMessage> ops;
+
+        do{
+            ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+        }while((ops == null || ops.size() == 0 ) &&  System.currentTimeMillis() < endTime);
+
+        return ops;
+    }
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b3e42ddb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index e2c5c1e..a78fc80 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -17,85 +17,106 @@
  */
 package org.apache.usergrid.persistence.queue.impl;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.SDKGlobalConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+
+import com.amazonaws.AbortedException;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang.StringUtils;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 public class SQSQueueManagerImpl implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
-    private  final AmazonSQSClient sqs;
+
     private  final QueueScope scope;
     private  ObjectMapper mapper;
+    private final QueueFig fig;
+    private final AmazonSQSClient sqs;
+
     private static SmileFactory smileFactory = new SmileFactory();
 
-    private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
-            .maximumSize(1000)
-            .build(new CacheLoader<SqsLoader, Queue>() {
-                       @Override
-                       public Queue load(SqsLoader queueLoader) throws Exception {
-                           Queue queue = null;
-                           try {
-                               GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
-                               queue = new Queue(result.getQueueUrl());
-                           } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                               queue = null;
-                           } catch (Exception e) {
-                               LOG.error("failed to get queue from service", e);
-                               throw e;
-                           }
-                           if (queue == null) {
-                               String name = queueLoader.getKey();
-                               CreateQueueRequest createQueueRequest = new CreateQueueRequest()
-                                       .withQueueName(name);
-                               CreateQueueResult result = sqs.createQueue(createQueueRequest);
-                               String url = result.getQueueUrl();
-                               queue = new Queue(url);
-                               LOG.info("Created queue with url {}", url);
-                           }
-                           return queue;
-                       }
-                   }
-            );
+    private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
+            .maximumSize( 1000 )
+            .build( new CacheLoader<String, Queue>() {
+                @Override
+                public Queue load( String queueLoader ) throws Exception {
+
+                    //the amazon client is not thread safe, we need to create one per queue
+                    Queue queue = null;
+                    try {
+                        GetQueueUrlResult result = sqs.getQueueUrl( queueLoader );
+                        queue = new Queue( result.getQueueUrl() );
+                    }catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+                        //no op, swallow
+
+                    }
+                    catch ( Exception e ) {
+                        LOG.error( "failed to get queue from service", e );
+                        throw e;
+                    }
+                    if ( queue == null ) {
+                        CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueLoader );
+                        CreateQueueResult result = sqs.createQueue( createQueueRequest );
+                        String url = result.getQueueUrl();
+                        queue = new Queue( url );
+                        LOG.info( "Created queue with url {}", url );
+                    }
+                    return queue;
+                }
+            } );
+
 
     @Inject
-    public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+    public SQSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig ){
         this.scope = scope;
+        this.fig = fig;
         try {
-            UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-            this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
-            Regions regions = Regions.fromName(fig.getRegion());
-            Region region = Region.getRegion(regions);
-            sqs.setRegion(region);
+
             smileFactory.delegateToTextual(true);
             mapper = new ObjectMapper( smileFactory );
             //pretty print, disabling for speed
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+
+            sqs = createClient();
+
         } catch ( Exception e ) {
             throw new RuntimeException("Error setting up mapper", e);
         }
@@ -109,7 +130,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public Queue getQueue() {
         try {
-            Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+            Queue queue = urlMap.get(getName());
             return queue;
         } catch (ExecutionException ee) {
             throw new RuntimeException(ee);
@@ -228,38 +249,29 @@ public class SQSQueueManagerImpl implements QueueManager {
         return mapper.writeValueAsString(o);
     }
 
-    public class SqsLoader {
-        private final String key;
-
-        public SqsLoader(String key, AmazonSQSClient client) {
-            this.key = key;
-        }
-
 
-        public String getKey() {
-            return key;
-        }
+    /**
+     * Get the region
+     * @return
+     */
+    private Region getRegion() {
+        Regions regions = Regions.fromName( fig.getRegion() );
+        Region region = Region.getRegion( regions );
+        return region;
+    }
 
-        @Override
-        public boolean equals(Object o){
-            if(o instanceof  SqsLoader){
-                SqsLoader loader = (SqsLoader)o;
-                return loader.getKey().equals(this.getKey());
-            }
-            return false;
-        }
 
-        @Override
-        public int hashCode() {
-            int result = getKey().hashCode();
-            return result;
-        }
+    /**
+     * Create the SQS client for the specified settings
+     */
+    private AmazonSQSClient createClient() {
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
+        final Region region = getRegion();
+        sqs.setRegion( region );
 
+        return sqs;
+    }
 
-        @Override
-        public String toString() {
-            return getKey();
-        }
 
-    }
 }