You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by peterj99a <gi...@git.apache.org> on 2017/09/27 15:21:41 UTC
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
GitHub user peterj99a opened a pull request:
https://github.com/apache/usergrid/pull/575
Allow submission to SNS/SQS via sync client
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/peterj99a/usergrid master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/usergrid/pull/575.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #575
----
commit 9a15108924b29997c9ca440b467ab8006a8f8acb
Author: Peter Johnson <pj...@apigee.com>
Date: 2017-09-27T15:19:20Z
Allow submission to SNS/SQS via sync client
----
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141448710
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -647,8 +740,7 @@ public void sendMessages( final List bodies ) throws IOException {
@Override
public void onError( final Exception e ) {
-
- logger.error( "Error sending message... {}", e );
+ logger.error("Failed to send this message {}. To this address {}. Error was ", stringBody, url, e);
--- End diff --
common format
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141446957
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -541,7 +541,49 @@ public long getQueueDepth() {
@Override
- public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException {
+ public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
+ boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ if (sendAsync) {
+ sendMessageToAllRegionsAsync(body);
+ } else {
+ sendMessageToAllRegionsSync(body);
+ }
+ }
+
+
+ private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
+ if ( sns == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
+
+ final String stringBody = toString( body );
+
+ String topicArn = getWriteTopicArn();
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+ }
+
+ try {
+ PublishResult publishResult = sns.publish(topicArn, toString(body));
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Successfully published... messageID=[{}], arn=[{}]", publishResult.getMessageId(),
+ topicArn );
+ }
+ } catch (Exception e) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Failed to send this message {} to SNS queue at {}", stringBody, topicArn);
--- End diff --
"Failed to send this message [{}] to SNS queue at {}, sending asynchronously" -- should this be logger.info?
Or maybe we should have same format in all these logs for ease of parsing if we need to use a cron job?
"FAILED INDEX REQUEST: Failed to send message to SNS Queue, sending asynchronously. Message:[{}] URL:[{}]"
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141403595
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
@@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
handleWriteUniqueVerifyException( entity, wuve );
}
- if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
+ if (!skipIndexingForType) {
+ indexEntity(cpEntity, asyncIndex);
+ deIndexOldVersionsOfEntity(cpEntity);
+ }
+ }
- // queue an event to update the new entity
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
+ private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
+ // queue an event to update the new entity
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
+ }
- // queue up an event to clean-up older versions than this one from the index
- if (entityManagerFig.getDeindexOnUpdate()) {
- indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
- }
+ private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
--- End diff --
do we need fully qualified name for Entity here? generally we don't
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141455166
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---
@@ -21,6 +21,7 @@
import java.util.ArrayList;
+import java.util.Collections;
--- End diff --
will fix
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141456595
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -541,7 +541,49 @@ public long getQueueDepth() {
@Override
- public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException {
+ public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
+ boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ if (sendAsync) {
+ sendMessageToAllRegionsAsync(body);
+ } else {
+ sendMessageToAllRegionsSync(body);
+ }
+ }
+
+
+ private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
+ if ( sns == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
+
+ final String stringBody = toString( body );
+
+ String topicArn = getWriteTopicArn();
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+ }
+
+ try {
+ PublishResult publishResult = sns.publish(topicArn, toString(body));
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Successfully published... messageID=[{}], arn=[{}]", publishResult.getMessageId(),
+ topicArn );
+ }
+ } catch (Exception e) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Failed to send this message {} to SNS queue at {}", stringBody, topicArn);
--- End diff --
I'll make them the same
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141453716
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
@@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
handleWriteUniqueVerifyException( entity, wuve );
}
- if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
+ if (!skipIndexingForType) {
+ indexEntity(cpEntity, asyncIndex);
+ deIndexOldVersionsOfEntity(cpEntity);
+ }
+ }
- // queue an event to update the new entity
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
+ private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
+ // queue an event to update the new entity
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
+ }
- // queue up an event to clean-up older versions than this one from the index
- if (entityManagerFig.getDeindexOnUpdate()) {
- indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
- }
+ private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
--- End diff --
It would otherwise be confused with org.apache.usergrid.persistence.Entity which is brought in by the "import org.apache.usergrid.persistence.*;" and used all over the file.
I
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141456652
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -625,16 +677,57 @@ public void sendMessages( final List bodies ) throws IOException {
return successMessages;
}
-
@Override
- public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
+ public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
+ boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ if (sendAsync) {
+ sendMessageToLocalRegionAsync(body);
+ } else {
+ sendMessageToLocalRegionSync(body);
+ }
+ }
- if ( sqsAsync == null ) {
+ private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException {
+
+ if ( sqs == null ) {
logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
}
final String stringBody = toString( body );
+ if (logger.isDebugEnabled()) {
+ logger.debug(" sendMessageToLocalRegion " + stringBody);
+ }
+
+ String url = getReadQueue().getUrl();
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
+ }
+
+ SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody);
+ try {
+ SendMessageResult result = sqs.sendMessage(messageRequest);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ url);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to send this message {}. To this address {}. Error was ", messageRequest.getMessageBody(), url, e);
--- End diff --
ditto
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141444532
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---
@@ -21,6 +21,7 @@
import java.util.ArrayList;
+import java.util.Collections;
--- End diff --
looks like this is extraneous
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141457438
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -647,8 +740,7 @@ public void sendMessages( final List bodies ) throws IOException {
@Override
public void onError( final Exception e ) {
-
- logger.error( "Error sending message... {}", e );
+ logger.error("Failed to send this message {}. To this address {}. Error was ", stringBody, url, e);
--- End diff --
will do
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141455370
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
@@ -546,38 +550,32 @@ public void update( Entity entity ) throws Exception {
handleWriteUniqueVerifyException( entity, wuve );
}
- if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
+ if (!skipIndexingForType) {
+ indexEntity(cpEntity, asyncIndex);
+ deIndexOldVersionsOfEntity(cpEntity);
+ }
+ }
- // queue an event to update the new entity
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
+ private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
+ // queue an event to update the new entity
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
+ }
- // queue up an event to clean-up older versions than this one from the index
- if (entityManagerFig.getDeindexOnUpdate()) {
- indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
- }
+ private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
--- End diff --
ok, sounds good
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141430100
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
@@ -2871,6 +2864,14 @@ else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
return entity;
}
+ private <A extends Entity> void updateIndexForEniity(String eType, A entity, long timestamp) throws Exception {
--- End diff --
typo in name
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141455194
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---
@@ -2871,6 +2864,14 @@ else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
return entity;
}
+ private <A extends Entity> void updateIndexForEniity(String eType, A entity, long timestamp) throws Exception {
--- End diff --
will fix
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/usergrid/pull/575
---
[GitHub] usergrid pull request #575: Allow submission to SNS/SQS via sync client
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141448784
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---
@@ -625,16 +677,57 @@ public void sendMessages( final List bodies ) throws IOException {
return successMessages;
}
-
@Override
- public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
+ public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
+ boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ if (sendAsync) {
+ sendMessageToLocalRegionAsync(body);
+ } else {
+ sendMessageToLocalRegionSync(body);
+ }
+ }
- if ( sqsAsync == null ) {
+ private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException {
+
+ if ( sqs == null ) {
logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
}
final String stringBody = toString( body );
+ if (logger.isDebugEnabled()) {
+ logger.debug(" sendMessageToLocalRegion " + stringBody);
+ }
+
+ String url = getReadQueue().getUrl();
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
+ }
+
+ SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody);
+ try {
+ SendMessageResult result = sqs.sendMessage(messageRequest);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ url);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to send this message {}. To this address {}. Error was ", messageRequest.getMessageBody(), url, e);
--- End diff --
common format
---