You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/12/11 22:50:31 UTC
usergrid git commit: update SNS topic permissions for SQS queues when
necessary
Repository: usergrid
Updated Branches:
refs/heads/hotfix_20171205 71169f89a -> 2b3573377
update SNS topic permissions for SQS queues when necessary
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2b357337
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2b357337
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2b357337
Branch: refs/heads/hotfix_20171205
Commit: 2b3573377a96eda1341fd61568195476744aabd0
Parents: 71169f8
Author: Mike Dunker <md...@google.com>
Authored: Mon Dec 11 14:49:57 2017 -0800
Committer: Mike Dunker <md...@google.com>
Committed: Mon Dec 11 14:49:57 2017 -0800
----------------------------------------------------------------------
.../queue/util/AmazonNotificationUtils.java | 135 ++++++++++++++-----
1 file changed, 101 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b357337/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 56bef91..b2b209c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,21 +1,16 @@
package org.apache.usergrid.persistence.queue.util;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import com.amazonaws.auth.policy.*;
+import com.amazonaws.auth.policy.conditions.ArnCondition;
+import com.amazonaws.services.sqs.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.queue.LegacyQueueFig;
-import com.amazonaws.auth.policy.Condition;
-import com.amazonaws.auth.policy.Policy;
-import com.amazonaws.auth.policy.Principal;
-import com.amazonaws.auth.policy.Resource;
-import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.auth.policy.conditions.ConditionFactory;
import com.amazonaws.services.sns.AmazonSNSClient;
@@ -23,13 +18,6 @@ import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.ListTopicsResult;
import com.amazonaws.services.sns.model.Topic;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
/**
@@ -85,34 +73,113 @@ public class AmazonNotificationUtils {
public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
final List<String> topicARNs ) throws Exception {
- String queueARN = getQueueArnByUrl( sqs, queueUrl );
-
- Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage )
- .withPrincipals( new Principal( "*" ) )
- .withResources( new Resource( queueARN ) );
+ // retrieve queue ARN and policy
+ List<String> sqsAttrNames = Arrays.asList(QueueAttributeName.QueueArn.toString(),
+ QueueAttributeName.Policy.toString());
+ GetQueueAttributesRequest getQueueAttributesRequest =
+ new GetQueueAttributesRequest( queueUrl ).withAttributeNames( sqsAttrNames );
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( getQueueAttributesRequest );
+ Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
+ String queueARN = sqsAttributeMap.get(QueueAttributeName.QueueArn.toString());
+ String policyJson = sqsAttributeMap.get(QueueAttributeName.Policy.toString());
+
+ // cannot send ARN in settings update, so remove it
+ sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString());
+
+ // get existing policy from JSON
+ Policy policy = policyJson != null && policyJson.length() > 0 ? Policy.fromJson(policyJson) : new Policy();
+
+ // see if permissions already exist, and find ArnLike conditions
+ boolean matchingConditionFound = false;
+ boolean policyEdited = false;
+ for (Statement statement : policy.getStatements()) {
+ logger.info("statement id: {}, effect: {}, action: {}, resources:{}",
+ statement.getId(), statement.getEffect().name(),
+ statement.getActions().get(0).getActionName(),
+ statement.getResources().get(0).getId());
+
+ // must be Allow effect
+ if (! statement.getEffect().name().equals(Statement.Effect.Allow.name())) {
+ continue;
+ }
- List<Condition> conditions = new ArrayList<>();
+ // must be SendMessage action
+ boolean actionFound = false;
+ for (Action action : statement.getActions()) {
+ // do lower case comparison, since UI adds SQS.SendMessage but SDK uses sqs.SendMessage
+ if (action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase())) {
+ actionFound = true;
+ break;
+ }
+ }
+ if (!actionFound) {
+ continue;
+ }
- for ( String topicARN : topicARNs ) {
+ // must be same queue resource
+ boolean queueResourceFound = false;
+ for (Resource resource : statement.getResources()) {
+ if (resource.getId().equals(queueARN)) {
+ queueResourceFound = true;
+ break;
+ }
+ }
+ if (!queueResourceFound) {
+ continue;
+ }
- conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) );
+ // found matching statement, check conditions for source ARN
+ for (Condition condition : statement.getConditions()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("condition type: {}, conditionKey: {}", condition.getType(), condition.getConditionKey());
+ }
+ if (condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) &&
+ condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) {
+ matchingConditionFound = true;
+ for (String topicARN : topicARNs) {
+ if (! condition.getValues().contains(topicARN)) {
+ // topic doesn't exist, add it
+ policyEdited = true;
+ condition.getValues().add(topicARN);
+ }
+ }
+ }
+ }
}
- statement.setConditions( conditions );
- Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement );
+ if (!matchingConditionFound) {
+ // never found ArnLike SourceArn condition, need to add a statement
+ List<Condition> conditions = new ArrayList<>();
+ for (String topicARN : topicARNs) {
- final Map<String, String> queueAttributes = new HashMap<>();
- queueAttributes.put( "Policy", policy.toJson() );
+ conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+ }
- SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes );
+ Statement statement = new Statement(Statement.Effect.Allow)
+ .withPrincipals(Principal.AllUsers)
+ .withActions(SQSActions.SendMessage)
+ .withResources(new Resource(queueARN));
+ statement.setConditions(conditions);
- try {
- sqs.setQueueAttributes( queueAttributesRequest );
+ policy.getStatements().add(statement);
+ policyEdited = true;
}
- catch ( Exception e ) {
- logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
- topicARNs.toString(), e );
+
+ if (policyEdited) {
+ sqsAttributeMap.put(QueueAttributeName.Policy.toString(), policy.toJson());
+
+ // log if permissions are being updated
+ logger.info("updating permissions for queueARN: {}, new policy: {}", queueARN, policy.toJson());
+
+ SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(queueUrl, sqsAttributeMap);
+
+ try {
+ sqs.setQueueAttributes(setQueueAttributesRequest);
+ } catch (Exception e) {
+ logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
+ topicARNs.toString(), e);
+ }
}
}