You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 23:53:17 UTC
[GitHub] [beam] kennknowles opened a new issue, #19406: Inputs SQS with Session based Windowing doesn't work
kennknowles opened a new issue, #19406:
URL: https://github.com/apache/beam/issues/19406
Hi,
Trying to use Beam with AWS SQS service as an input source, using Session windows.
The windows aren't executed. Code works well when the input source is Kafka:
```
// code placeholder
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
PipelineOptions
options = PipelineOptionsFactory.create();
AwsOptions awsOptions = options.as(AwsOptions.class);
BasicAWSCredentials
awsCreds = new BasicAWSCredentials("", "");
awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
awsOptions.setAwsRegion("eu-west-1");
Pipeline
p = Pipeline.create(options);
// This example reads a public data set consisting of the complete works
of Shakespeare.
p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX"))
/*Per
session windows*/
.apply(ParDo.of(new DoFn<Message, String>() {
@ProcessElement
public void processElement(@Element
Message element, OutputReceiver<String> out) {
// Extract the timestamp from log entry we're currently
processing.
LOG.info("Message Body: {}", element.getBody());
out.output(element.getBody());
}
}))
//Set
windowing configuration
.apply(
"WindowIntoSessions",
Window.<String>into(
Sessions.withGapDuration(Duration.standardSeconds(5)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
//.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
//
Late data is dropped
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
//Extract and
count: Extracts a the object to an KV store of <userID, count>
.apply(
MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(),
TypeDescriptors.integers()))
.via(
(String testO) -> KV.of(testO, new Integer(1))
)
)
.apply("CountElements",
Sum.integersPerKey())
.apply("Log", ParDo.of(new FilterTextFn()))
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String,
Integer> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
;
p.run().waitUntilFinish();
}
```
Imported from Jira [BEAM-7498](https://issues.apache.org/jira/browse/BEAM-7498). Original Jira may contain additional context.
Reported by: esteveavi.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org