You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2020/05/25 09:39:17 UTC

[GitHub] [bahir] DmitryGrb opened a new pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

DmitryGrb opened a new pull request #97:
URL: https://github.com/apache/bahir/pull/97


   Added messageWrapper option for SQS streaming connector which says if this is pure
   s3 notification event or it is coming from SNS topic


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r458672251



##########
File path: sql-streaming-sqs/README.md
##########
@@ -63,6 +63,7 @@ shouldSortFiles|true|whether to sort files based on timestamp while listing them
 useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
 maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
 maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None|'None' if SQS contains plain S3 message. 'SNS' if SQS contains S3 notification message which came from SNS

Review comment:
       Done. Added small section which describes differences in SQS vs SNS approach




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #97:
URL: https://github.com/apache/bahir/pull/97#issuecomment-667229649


   @lresende Can you please review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #97:
URL: https://github.com/apache/bahir/pull/97#issuecomment-696030475


   @lresende 
   Any update on this PR from your side?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r458670009



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       Done. Added small section which describes differences. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #97:
URL: https://github.com/apache/bahir/pull/97#issuecomment-696030475


   @lresende 
   Any update on this PR from your side?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] lresende merged pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
lresende merged pull request #97:
URL: https://github.com/apache/bahir/pull/97


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463778601



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       Thanks for the details! I will raise a ticket shortly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463742385



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       I see. There's a retry on failure mechanism for error in fetching messages from SQS. Maybe we should have an option for retry on failure in case of an error in parsing messages too. Of course, it's not related to this and can be done as a part of a separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463750572



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       Right now if we fail to parse SQS we just remove these messages from SQS. I think what would make sense is try to refetch them again. And agree, this should be done in another PR. I can raise a ticket and make these changes. Thoughts/Objections?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463739330



##########
File path: sql-streaming-sqs/README.md
##########
@@ -63,6 +63,18 @@ shouldSortFiles|true|whether to sort files based on timestamp while listing them
 useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
 maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
 maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None| - 'None' if SQS contains plain S3 message. <br/> - 'SNS' if SQS contains S3 notification message which came from SNS. <br/> Please see 'SQS vs SNS' section for more details 
+
+## SQS vs SNS

Review comment:
       makes total sense. Changed, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463772633



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       In my use case, I've seen some extra messages in the SQS queue in the following scenarios:
   - When S3 bucket is configured to SQS queue during pipeline setup
   - some configuration is changed 
   - some test messages. 
   
   There are not `ObjectCreate` or `ObjectRemoved ` messages and so they are not relevant for spark consumer application so it seemed reasonable to ignore and delete them. 
   
   But if the spark consumer receives too many unparsable messages, it means there might be an issue with the pipeline set up. so according to me, it makes sense to throw an error and abort the pipeline with the help of a finite max retries mechanism.
   
   Refetching and trying to parse the same message again may help if a legitimate message gets corrupted while fetching it from SQS. Can you list the scenarios where refetching and trying to parse the same message will help?
   
   Feel free to raise a ticket and start a PR. We can discuss this further over there.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] DmitryGrb commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
DmitryGrb commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r458670009



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       Yes, it throws MappingException, however, error message was not descriptive enough:
   `org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String`
   
   So I have updated the code to explicitly check for Message field in json and if it is not there throw Mapping exception with more details. Ideally we need our own exception here, however, I used org.json4s.MappingException since it is catch a few lines later anyways and not exposed anywhere. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r463725913



##########
File path: sql-streaming-sqs/README.md
##########
@@ -63,6 +63,18 @@ shouldSortFiles|true|whether to sort files based on timestamp while listing them
 useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
 maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
 maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None| - 'None' if SQS contains plain S3 message. <br/> - 'SNS' if SQS contains S3 notification message which came from SNS. <br/> Please see 'SQS vs SNS' section for more details 
+
+## SQS vs SNS

Review comment:
       Maybe change the title to Using Multiple Consumers




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #97:
URL: https://github.com/apache/bahir/pull/97#issuecomment-673421393


   @lresende Did you get a chance to review this PR? Changes LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #97:
URL: https://github.com/apache/bahir/pull/97#issuecomment-667300146


   Let's wait for a review from @lresende. Is it possible to add a simple unit test to test the parsing logic for a message received via SNS? I acknowledge it might not be straightforward since `parseSqsMessages` module is private and there is no test right now for testing parsing logic for a message received directly from S3 -> SQS. You can consider making `parseSqsMessages` public if that helps. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir] abhishekd0907 commented on a change in pull request #97: [BAHIR-233] Add SNS message support for SQS streaming source

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on a change in pull request #97:
URL: https://github.com/apache/bahir/pull/97#discussion_r449407090



##########
File path: sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
##########
@@ -131,13 +131,24 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => parse((parsedBody \ "Message").extract[String])

Review comment:
       Will this also throw Mapping Exception when it encounters a message in different format? Say, `S3MessageWrapper` is set to SNS but SQS queue has some message which comes directly from S3 and not via SNS

##########
File path: sql-streaming-sqs/README.md
##########
@@ -63,6 +63,7 @@ shouldSortFiles|true|whether to sort files based on timestamp while listing them
 useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
 maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
 maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None|'None' if SQS contains plain S3 message. 'SNS' if SQS contains S3 notification message which came from SNS

Review comment:
       can you add a small 2-3 lines section in Readme explaining how to use S3-SQS data-source for multiple consumers  by setting `messageWrapper` to SNS? Whatever you've explained in the JIRA should be sufficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org