You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Sambasivam, Sasidharan (external)" <s....@metro-external.digital> on 2023/01/09 17:12:01 UTC

Streaming of files using ContextualTextIO.read API

Hi,

We have written a dataflow pipeline which reads a file from GCS using ContextualTextIO.read() method with "RecordNumMetadata" enabled since we require line number of the record as well. Apache beam version used is 2.38.

We are looking at modifying the pipeline to continously watch for new files in GCS so that it can be processed immediately when copied to the location as per the documentation,
https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.html

'''
Example 3: streaming new files matching a filepattern.


 Pipeline p = ...;

 PCollection<Row> records = p.apply(ContextualTextIO.read()
     .from("/local/path/to/files/*")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));
'''


But there is no "watchForNewFiles" method available with ContextualTextIO.read() method.

Currently we are trying out with below code changes to see if streaming of new files using file pattern works with ContextualTextIO API

pipeline.apply(ContextualTextIO.read().from(options.getFilePattern)
  .withMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW).continuously(Duration.standardMinutes(20), Growth.never()))
  .withRecordNumMetadata())

But, we are getting below error when creating the dataflow template

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
      at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:155)
      at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:212)
      at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:109)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:363)
      at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1598)
      at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1487)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:376)
      at org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$ProcessRecordNumbers.expand(ContextualTextIO.java:693)
      at org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$ProcessRecordNumbers.expand(ContextualTextIO.java:659)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:363)
      at org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$Read.expand(ContextualTextIO.java:408)
      at org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$Read.expand(ContextualTextIO.java:248)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
      at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)


Is there anyway to achieve streaming of files using ContextualTextIO.read() API?

Thanks in advance,

Sasi
Geschäftsanschrift/Business address: METRO Digital GmbH, Metro-Straße 12, 40235 Duesseldorf, Germany Geschäftsführung/Management Board: Timo Salzsieder (Vorsitzender/CEO), Felix Lindemann (COO), Frank Hammerle (CFO) Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 90755/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 90755 Betreffend Mails von *@metro.digital Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail. Regarding mails from *@metro.digital This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If you have received this e-mail by mistake, please inform the sender immediately and destroy the e-mail.