You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by peterj99a <gi...@git.apache.org> on 2017/09/27 15:21:41 UTC

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

GitHub user peterj99a opened a pull request:

    https://github.com/apache/usergrid/pull/575

    Allow submission to SNS/SQS via sync client

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/peterj99a/usergrid master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/usergrid/pull/575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #575
    
----
commit 9a15108924b29997c9ca440b467ab8006a8f8acb
Author: Peter Johnson <pj...@apigee.com>
Date:   2017-09-27T15:19:20Z

    Allow submission to SNS/SQS via sync client

----


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141448710
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -647,8 +740,7 @@ public void sendMessages( final List bodies ) throws IOException {
     
                 @Override
                 public void onError( final Exception e ) {
    -
    -                logger.error( "Error sending message... {}", e );
    +                logger.error("Failed to send this message {}. To this address {}. Error was ", stringBody, url, e);
    --- End diff --
    
    common format


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141446957
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -541,7 +541,49 @@ public long getQueueDepth() {
     
     
         @Override
    -    public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException {
    +    public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
    +        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
    +        if (sendAsync) {
    +            sendMessageToAllRegionsAsync(body);
    +        } else {
    +            sendMessageToAllRegionsSync(body);
    +        }
    +    }
    +
    +
    +    private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
    +        if ( sns == null ) {
    +            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
    +            return;
    +        }
    +
    +        final String stringBody = toString( body );
    +
    +        String topicArn = getWriteTopicArn();
    +
    +        if ( logger.isTraceEnabled() ) {
    +            logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
    +        }
    +
    +        try {
    +            PublishResult publishResult = sns.publish(topicArn, toString(body));
    +            if ( logger.isTraceEnabled() ) {
    +                logger.trace( "Successfully published... messageID=[{}],  arn=[{}]", publishResult.getMessageId(),
    +                    topicArn );
    +            }
    +        } catch (Exception e) {
    +            if (logger.isErrorEnabled()) {
    +                logger.error("Failed to send this message {} to SNS queue at {}", stringBody, topicArn);
    --- End diff --
    
    "Failed to send this message [{}] to SNS queue at {}, sending asynchronously" -- should this be logger.info?
    
    Or maybe we should have same format in all these logs for ease of parsing if we need to use a cron job?
    
    "FAILED INDEX REQUEST: Failed to send message to SNS Queue, sending asynchronously. Message:[{}] URL:[{}]"


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141403595
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
    @@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
                 handleWriteUniqueVerifyException( entity, wuve );
             }
     
    -        if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
    +        if (!skipIndexingForType) {
    +            indexEntity(cpEntity, asyncIndex);
    +            deIndexOldVersionsOfEntity(cpEntity);
    +        }
    +    }
     
    -            // queue an event to update the new entity
    -            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
    +    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
    +        // queue an event to update the new entity
    +        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
    +    }
     
    -            // queue up an event to clean-up older versions than this one from the index
    -            if (entityManagerFig.getDeindexOnUpdate()) {
    -                indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
    -            }
    +    private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
    --- End diff --
    
    do we need fully qualified name for Entity here? generally we don't


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141455166
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---
    @@ -21,6 +21,7 @@
     
     
     import java.util.ArrayList;
    +import java.util.Collections;
    --- End diff --
    
    will fix 


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141456595
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -541,7 +541,49 @@ public long getQueueDepth() {
     
     
         @Override
    -    public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException {
    +    public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
    +        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
    +        if (sendAsync) {
    +            sendMessageToAllRegionsAsync(body);
    +        } else {
    +            sendMessageToAllRegionsSync(body);
    +        }
    +    }
    +
    +
    +    private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
    +        if ( sns == null ) {
    +            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
    +            return;
    +        }
    +
    +        final String stringBody = toString( body );
    +
    +        String topicArn = getWriteTopicArn();
    +
    +        if ( logger.isTraceEnabled() ) {
    +            logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
    +        }
    +
    +        try {
    +            PublishResult publishResult = sns.publish(topicArn, toString(body));
    +            if ( logger.isTraceEnabled() ) {
    +                logger.trace( "Successfully published... messageID=[{}],  arn=[{}]", publishResult.getMessageId(),
    +                    topicArn );
    +            }
    +        } catch (Exception e) {
    +            if (logger.isErrorEnabled()) {
    +                logger.error("Failed to send this message {} to SNS queue at {}", stringBody, topicArn);
    --- End diff --
    
    I'll make them the same 


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141453716
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
    @@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
                 handleWriteUniqueVerifyException( entity, wuve );
             }
     
    -        if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
    +        if (!skipIndexingForType) {
    +            indexEntity(cpEntity, asyncIndex);
    +            deIndexOldVersionsOfEntity(cpEntity);
    +        }
    +    }
     
    -            // queue an event to update the new entity
    -            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
    +    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
    +        // queue an event to update the new entity
    +        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
    +    }
     
    -            // queue up an event to clean-up older versions than this one from the index
    -            if (entityManagerFig.getDeindexOnUpdate()) {
    -                indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
    -            }
    +    private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
    --- End diff --
    
    It would otherwise be confused with org.apache.usergrid.persistence.Entity which is brought in by the "import org.apache.usergrid.persistence.*;" and used all over the file.
    
    I 
    



---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141456652
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -625,16 +677,57 @@ public void sendMessages( final List bodies ) throws IOException {
             return successMessages;
         }
     
    -
         @Override
    -    public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
    +    public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
    +        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
    +        if (sendAsync) {
    +            sendMessageToLocalRegionAsync(body);
    +        } else {
    +            sendMessageToLocalRegionSync(body);
    +        }
    +    }
     
    -        if ( sqsAsync == null ) {
    +    private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException {
    +
    +        if ( sqs == null ) {
                 logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
                 return;
             }
             final String stringBody = toString( body );
     
    +        if (logger.isDebugEnabled()) {
    +            logger.debug(" sendMessageToLocalRegion " + stringBody);
    +        }
    +
    +        String url = getReadQueue().getUrl();
    +
    +        if ( logger.isTraceEnabled() ) {
    +            logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
    +        }
    +
    +        SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody);
    +        try {
    +            SendMessageResult result = sqs.sendMessage(messageRequest);
    +            if (logger.isTraceEnabled()) {
    +                logger.trace("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
    +                    url);
    +            }
    +        } catch (Exception e) {
    +            logger.error("Failed to send this message {}. To this address {}. Error was ",  messageRequest.getMessageBody(), url, e);
    --- End diff --
    
    ditto


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141444532
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---
    @@ -21,6 +21,7 @@
     
     
     import java.util.ArrayList;
    +import java.util.Collections;
    --- End diff --
    
    looks like this is extraneous


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141457438
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -647,8 +740,7 @@ public void sendMessages( final List bodies ) throws IOException {
     
                 @Override
                 public void onError( final Exception e ) {
    -
    -                logger.error( "Error sending message... {}", e );
    +                logger.error("Failed to send this message {}. To this address {}. Error was ", stringBody, url, e);
    --- End diff --
    
    will do


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141455370
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
    @@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
                 handleWriteUniqueVerifyException( entity, wuve );
             }
     
    -        if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
    +        if (!skipIndexingForType) {
    +            indexEntity(cpEntity, asyncIndex);
    +            deIndexOldVersionsOfEntity(cpEntity);
    +        }
    +    }
     
    -            // queue an event to update the new entity
    -            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
    +    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
    +        // queue an event to update the new entity
    +        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
    +    }
     
    -            // queue up an event to clean-up older versions than this one from the index
    -            if (entityManagerFig.getDeindexOnUpdate()) {
    -                indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
    -            }
    +    private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
    --- End diff --
    
    ok, sounds good


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141430100
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
    @@ -2871,6 +2864,14 @@ else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
             return entity;
         }
     
    +    private <A extends Entity> void updateIndexForEniity(String eType, A entity,  long timestamp) throws Exception {
    --- End diff --
    
    typo in name


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141455194
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
    @@ -2871,6 +2864,14 @@ else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
             return entity;
         }
     
    +    private <A extends Entity> void updateIndexForEniity(String eType, A entity,  long timestamp) throws Exception {
    --- End diff --
    
    will fix


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/usergrid/pull/575


---

[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/575#discussion_r141448784
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
    @@ -625,16 +677,57 @@ public void sendMessages( final List bodies ) throws IOException {
             return successMessages;
         }
     
    -
         @Override
    -    public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
    +    public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
    +        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
    +        if (sendAsync) {
    +            sendMessageToLocalRegionAsync(body);
    +        } else {
    +            sendMessageToLocalRegionSync(body);
    +        }
    +    }
     
    -        if ( sqsAsync == null ) {
    +    private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException {
    +
    +        if ( sqs == null ) {
                 logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
                 return;
             }
             final String stringBody = toString( body );
     
    +        if (logger.isDebugEnabled()) {
    +            logger.debug(" sendMessageToLocalRegion " + stringBody);
    +        }
    +
    +        String url = getReadQueue().getUrl();
    +
    +        if ( logger.isTraceEnabled() ) {
    +            logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
    +        }
    +
    +        SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody);
    +        try {
    +            SendMessageResult result = sqs.sendMessage(messageRequest);
    +            if (logger.isTraceEnabled()) {
    +                logger.trace("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
    +                    url);
    +            }
    +        } catch (Exception e) {
    +            logger.error("Failed to send this message {}. To this address {}. Error was ",  messageRequest.getMessageBody(), url, e);
    --- End diff --
    
    common format


---