You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/28 06:41:53 UTC

usergrid git commit: Add connection and socket timeouts for the AWS SNS and SQS clients. Separate queue status into its own endpoint.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 97fae9446 -> 91739c60f


Add connection and socket timeouts for the AWS SNS and SQS clients.  Separate queue status into its own endpoint.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/91739c60
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/91739c60
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/91739c60

Branch: refs/heads/release-2.1.1
Commit: 91739c60ffa8d7b2db25fa9e7eb731cbfb81863f
Parents: 97fae94
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Feb 27 21:41:50 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Feb 27 21:41:50 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          |  7 +++++
 .../asyncevents/AsyncEventServiceImpl.java      |  6 ++++
 .../usergrid/persistence/queue/QueueFig.java    | 10 +++++++
 .../queue/impl/SNSQueueManagerImpl.java         | 27 ++++++++++++-----
 .../org/apache/usergrid/rest/RootResource.java  | 31 +++++++++++++++++---
 5 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 288fb12..1abf83f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -88,6 +88,13 @@ public interface AsyncEventService extends ReIndexAction {
      */
     long getQueueDepth();
 
+    /**
+     * name of current queue manager implemented
+     * @return
+     */
+    String getQueueManagerClass();
+
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 7a71410..b1d0805 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -825,5 +825,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    public String getQueueManagerClass() {
+
+        return queue.getClass().getSimpleName();
+
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 88ad3ff..7757d58 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -86,4 +86,14 @@ public interface QueueFig extends GuicyFig {
     @Key( "usergrid.queue.localquorum.timeout")
     @Default("3000") // 3 seconds
     int getLocalQuorumTimeout();
+
+    @Key( "usergrid.queue.client.connection.timeout")
+    @Default( "1000" ) // 3 seconds
+    int getQueueClientConnectionTimeout();
+
+    @Key( "usergrid.queue.client.socket.timeout")
+    @Default( "3000" ) // 3 seconds
+    int getQueueClientSocketTimeout();
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 4028d46..f1d8c5a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
+import com.amazonaws.ClientConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +88,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final CassandraFig cassandraFig;
+    private final ClientConfiguration clientConfiguration;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
     private final AmazonSNSAsyncClient snsAsync;
@@ -165,6 +167,11 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         final Region region = getRegion();
 
+        this.clientConfiguration = new ClientConfiguration()
+            .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout())
+            .withSocketTimeout(queueFig.getQueueClientSocketTimeout())
+            .withGzip(true);
+
         try {
             sqs = createSQSClient( region );
             sns = createSNSClient( region );
@@ -329,10 +336,10 @@ public class SNSQueueManagerImpl implements QueueManager {
      */
 
     private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-
 
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor );
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSNSAsyncClient sns =
+            new AmazonSNSAsyncClient( ugProvider.getCredentials(), clientConfiguration, executor );
 
         sns.setRegion( region );
 
@@ -344,9 +351,10 @@ public class SNSQueueManagerImpl implements QueueManager {
      * Create the async sqs client
      */
     private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
-        final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSQSAsyncClient sqs =
+            new AmazonSQSAsyncClient( ugProvider.getCredentials(),clientConfiguration,  executor );
 
         sqs.setRegion( region );
 
@@ -358,9 +366,10 @@ public class SNSQueueManagerImpl implements QueueManager {
      * The Synchronous SNS client is used for creating topics and subscribing queues.
      */
     private AmazonSNSClient createSNSClient( final Region region ) {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
-        final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() );
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSNSClient sns =
+            new AmazonSNSClient( ugProvider.getCredentials(), clientConfiguration );
 
         sns.setRegion( region );
 
@@ -663,8 +672,10 @@ public class SNSQueueManagerImpl implements QueueManager {
      * Create the SQS client for the specified settings
      */
     private AmazonSQSClient createSQSClient( final Region region ) {
+
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
+        final AmazonSQSClient sqs =
+            new AmazonSQSClient( ugProvider.getCredentials(), clientConfiguration );
 
         sqs.setRegion( region );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index b7118a3..75ed567 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -174,9 +174,6 @@ public class RootResource extends AbstractContextResource implements MetricProce
 
         ApiResponse response = createApiResponse();
 
-        AsyncEventService eventService = injector.getInstance(AsyncEventService.class);
-
-
         if ( !ignoreError ) {
 
             if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) {
@@ -202,7 +199,6 @@ public class RootResource extends AbstractContextResource implements MetricProce
 
         // Core Persistence Query Index module status for Management App Index
         node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
-        node.put( "queueDepth", eventService.getQueueDepth() );
 
 
         dumpMetrics(node);
@@ -226,6 +222,33 @@ public class RootResource extends AbstractContextResource implements MetricProce
         return response.build();
     }
 
+    @GET
+    @Path("/status/queue")
+    @JSONP
+    @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
+    public ApiResponse getQueueDepth(){
+
+        ApiResponse response = createApiResponse();
+        AsyncEventService eventService = injector.getInstance(AsyncEventService.class);
+
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+
+        String provider = "LOCAL";
+        String queueManagerClass = eventService.getQueueManagerClass();
+
+        if(queueManagerClass.contains("SNS") || queueManagerClass.contains("SQS")){
+            provider = "AWS";
+        }
+
+        node.put( "provider", provider );
+        node.put( "depth", eventService.getQueueDepth() );
+
+        response.setProperty( "status", node );
+        return response;
+
+    }
+
+
 
     private void dumpMetrics( ObjectNode node ) {
         MetricsRegistry registry = Metrics.defaultRegistry();