You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/12/11 22:50:31 UTC

usergrid git commit: update SNS topic permissions for SQS queues when necessary

Repository: usergrid
Updated Branches:
  refs/heads/hotfix_20171205 71169f89a -> 2b3573377


update SNS topic permissions for SQS queues when necessary


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2b357337
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2b357337
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2b357337

Branch: refs/heads/hotfix_20171205
Commit: 2b3573377a96eda1341fd61568195476744aabd0
Parents: 71169f8
Author: Mike Dunker <md...@google.com>
Authored: Mon Dec 11 14:49:57 2017 -0800
Committer: Mike Dunker <md...@google.com>
Committed: Mon Dec 11 14:49:57 2017 -0800

----------------------------------------------------------------------
 .../queue/util/AmazonNotificationUtils.java     | 135 ++++++++++++++-----
 1 file changed, 101 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b357337/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 56bef91..b2b209c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,21 +1,16 @@
 package org.apache.usergrid.persistence.queue.util;
 
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import com.amazonaws.auth.policy.*;
+import com.amazonaws.auth.policy.conditions.ArnCondition;
+import com.amazonaws.services.sqs.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 
-import com.amazonaws.auth.policy.Condition;
-import com.amazonaws.auth.policy.Policy;
-import com.amazonaws.auth.policy.Principal;
-import com.amazonaws.auth.policy.Resource;
-import com.amazonaws.auth.policy.Statement;
 import com.amazonaws.auth.policy.actions.SQSActions;
 import com.amazonaws.auth.policy.conditions.ConditionFactory;
 import com.amazonaws.services.sns.AmazonSNSClient;
@@ -23,13 +18,6 @@ import com.amazonaws.services.sns.model.CreateTopicResult;
 import com.amazonaws.services.sns.model.ListTopicsResult;
 import com.amazonaws.services.sns.model.Topic;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 
 /**
@@ -85,34 +73,113 @@ public class AmazonNotificationUtils {
     public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
                                                      final List<String> topicARNs ) throws Exception {
 
-        String queueARN = getQueueArnByUrl( sqs, queueUrl );
-
-        Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage )
-                                                                     .withPrincipals( new Principal( "*" ) )
-                                                                     .withResources( new Resource( queueARN ) );
+        // retrieve queue ARN and policy
+        List<String> sqsAttrNames = Arrays.asList(QueueAttributeName.QueueArn.toString(),
+            QueueAttributeName.Policy.toString());
+        GetQueueAttributesRequest getQueueAttributesRequest =
+            new GetQueueAttributesRequest( queueUrl ).withAttributeNames( sqsAttrNames );
+        GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( getQueueAttributesRequest );
+        Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
+        String queueARN = sqsAttributeMap.get(QueueAttributeName.QueueArn.toString());
+        String policyJson = sqsAttributeMap.get(QueueAttributeName.Policy.toString());
+
+        // cannot send ARN in settings update, so remove it
+        sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString());
+
+        // get existing policy from JSON
+        Policy policy = policyJson != null && policyJson.length() > 0 ? Policy.fromJson(policyJson) : new Policy();
+
+        // see if permissions already exist, and find ArnLike conditions
+        boolean matchingConditionFound = false;
+        boolean policyEdited = false;
+        for (Statement statement : policy.getStatements()) {
+            logger.info("statement id: {}, effect: {}, action: {}, resources:{}",
+                statement.getId(), statement.getEffect().name(),
+                statement.getActions().get(0).getActionName(),
+                statement.getResources().get(0).getId());
+
+            // must be Allow effect
+            if (! statement.getEffect().name().equals(Statement.Effect.Allow.name())) {
+                continue;
+            }
 
-        List<Condition> conditions = new ArrayList<>();
+            // must be SendMessage action
+            boolean actionFound = false;
+            for (Action action : statement.getActions()) {
+                // do lower case comparison, since UI adds SQS.SendMessage but SDK uses sqs.SendMessage
+                if (action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase())) {
+                    actionFound = true;
+                    break;
+                }
+            }
+            if (!actionFound) {
+                continue;
+            }
 
-        for ( String topicARN : topicARNs ) {
+            // must be same queue resource
+            boolean queueResourceFound = false;
+            for (Resource resource : statement.getResources()) {
+                if (resource.getId().equals(queueARN)) {
+                    queueResourceFound = true;
+                    break;
+                }
+            }
+            if (!queueResourceFound) {
+                continue;
+            }
 
-            conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) );
+            // found matching statement, check conditions for source ARN
+            for (Condition condition : statement.getConditions()) {
+                if (logger.isTraceEnabled()) {
+                    logger.trace("condition type: {}, conditionKey: {}", condition.getType(), condition.getConditionKey());
+                }
+                if (condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) &&
+                    condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) {
+                    matchingConditionFound = true;
+                    for (String topicARN : topicARNs) {
+                        if (! condition.getValues().contains(topicARN)) {
+                            // topic doesn't exist, add it
+                            policyEdited = true;
+                            condition.getValues().add(topicARN);
+                        }
+                    }
+                }
+            }
         }
-        statement.setConditions( conditions );
 
-        Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement );
+        if (!matchingConditionFound) {
+            // never found ArnLike SourceArn condition, need to add a statement
+            List<Condition> conditions = new ArrayList<>();
 
+            for (String topicARN : topicARNs) {
 
-        final Map<String, String> queueAttributes = new HashMap<>();
-        queueAttributes.put( "Policy", policy.toJson() );
+                conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+            }
 
-        SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes );
+            Statement statement = new Statement(Statement.Effect.Allow)
+                .withPrincipals(Principal.AllUsers)
+                .withActions(SQSActions.SendMessage)
+                .withResources(new Resource(queueARN));
+            statement.setConditions(conditions);
 
-        try {
-            sqs.setQueueAttributes( queueAttributesRequest );
+            policy.getStatements().add(statement);
+            policyEdited = true;
         }
-        catch ( Exception e ) {
-            logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
-                topicARNs.toString(), e );
+
+        if (policyEdited) {
+            sqsAttributeMap.put(QueueAttributeName.Policy.toString(), policy.toJson());
+
+            // log if permissions are being updated
+            logger.info("updating permissions for queueARN: {}, new policy: {}", queueARN, policy.toJson());
+
+            SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(queueUrl, sqsAttributeMap);
+
+            try {
+                sqs.setQueueAttributes(setQueueAttributesRequest);
+            } catch (Exception e) {
+                logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
+                    topicARNs.toString(), e);
+            }
         }
     }