You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 23:53:17 UTC

[GitHub] [beam] kennknowles opened a new issue, #19406: Inputs SQS with Session based Windowing doesn't work

kennknowles opened a new issue, #19406:
URL: https://github.com/apache/beam/issues/19406

   Hi,
   
    
   
   Trying to use Beam with AWS SQS service as an input source, using Session windows.
   
   The windows aren't executed. Code works well when the input source is Kafka:
   ```
   
   // code placeholder
   SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
   
   PipelineOptions
   options = PipelineOptionsFactory.create();
   AwsOptions awsOptions = options.as(AwsOptions.class);
   BasicAWSCredentials
   awsCreds = new BasicAWSCredentials("", "");
   awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
   awsOptions.setAwsRegion("eu-west-1");
   
   Pipeline
   p = Pipeline.create(options);
   // This example reads a public data set consisting of the complete works
   of Shakespeare.
   p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX"))
   
   
   /*Per
   session windows*/
   .apply(ParDo.of(new DoFn<Message, String>() {
   @ProcessElement
   public void processElement(@Element
   Message element, OutputReceiver<String> out) {
   // Extract the timestamp from log entry we're currently
   processing.
   
   LOG.info("Message Body: {}", element.getBody());
   out.output(element.getBody());
   }
   }))
   
   //Set
   windowing configuration
   
   
   
   
   .apply(
   "WindowIntoSessions",
   Window.<String>into(
   Sessions.withGapDuration(Duration.standardSeconds(5)))
   .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
   //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
   //
   Late data is dropped
   .accumulatingFiredPanes()
   .withAllowedLateness(Duration.ZERO))
   
   //Extract and
   count: Extracts a the object to an KV store of <userID, count>
   .apply(
   MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(),
   TypeDescriptors.integers()))
   .via(
   (String testO) -> KV.of(testO, new Integer(1))
   )
   )
   
   .apply("CountElements",
   Sum.integersPerKey())
   
   .apply("Log", ParDo.of(new FilterTextFn()))
   
   .apply(
   MapElements.into(TypeDescriptors.strings())
   .via(
   (KV<String,
   Integer> wordCount) ->
   wordCount.getKey() + ": " + wordCount.getValue()))
   ;
   
   p.run().waitUntilFinish();
   }
   
   
   ```
   
    
   
   Imported from Jira [BEAM-7498](https://issues.apache.org/jira/browse/BEAM-7498). Original Jira may contain additional context.
   Reported by: esteveavi.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org