You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Piotr Filipiuk <pi...@gmail.com> on 2020/10/08 20:55:30 UTC

Processing files as they arrive with custom timestamps

Hi,

I am looking into:
https://beam.apache.org/documentation/patterns/file-processing/ since I
would like to create a continuous pipeline that reads from files and
assigns Event Times based on e.g. file metadata or actual data inside the
file. For example:

private static void run(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  Pipeline pipeline = Pipeline.create(options);

  PCollection<Metadata> matches = pipeline
      .apply(FileIO.match()
          .filepattern("/tmp/input/*")
          .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
  matches
      .apply(ParDo.of(new ReadFileFn()))

  pipeline.run();
}

private static final class ReadFileFn extends DoFn<Metadata, String> {
  private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);

  @ProcessElement
  public void processElement(ProcessContext c) throws IOException {
    Metadata metadata = c.element();
    // I believe c.timestamp() is based on processing time.
    logger.info("reading {} @ {}", metadata, c.timestamp());
    String filename = metadata.resourceId().toString();
    // Output timestamps must be no earlier than the timestamp of the
    // current input minus the allowed skew (0 milliseconds).
    Instant timestamp = new Instant(metadata.lastModifiedMillis());
    logger.info("lastModified @ {}", timestamp);
    try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
      String line;
      while ((line = br.readLine()) != null) {
        c.outputWithTimestamp(line, c.timestamp());
      }
    }
  }
}

The issue is that when calling c.outputWithTimestamp() I am getting:

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp
1970-01-01T00:00:00.000Z. Output timestamps must be no earlier than the
timestamp of the current input (2020-10-08T20:39:44.286Z) minus the allowed
skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
details on changing the allowed skew.

I believe this is because MatchPollFn.apply() uses Instant.now() as the
event time for the PCollection<Metadata>. I can see that the call to
continuously() makes the PCollection unbounded and assigns default Event
Time. Without the call to continuously() I can assign the timestamps
without problems either via c.outputWithTimestamp or WithTimestamp
transform.

I would like to know what is the way to fix the issue, and whether this
use-case is currently supported in Beam.

-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Hi,

We realized that the above implementation does not work correctly with
Flink Runner (with DirectRunner it works as expected). It seems that the
call to:

Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);

does not correctly update the watermarks. We observed that the initial
value of the watermark is never changed. That is, if initially there are no
files to match the watermark is equal to Instant.EPOCH and it stays that
way. In the case files exist before the pipeline executes MatchPollFn for
the first time, the watermark is correctly set based on the file name,
however even though new files are created, the watermark is not updated.

We tried the following setups, both exhibit the same issue:

   - Flink 1.11.2 with Beam 2.25
   - Flink 1.10.2 with Beam 2.24

The issue can be reproduced using the following test (or by running the
below pipeline directly with Flink Runner):

@RunWith(JUnit4.class)
public class ReadFilesTest {
  String[] args = {
      "--runner=FlinkRunner", "--checkpointingInterval=60000"
  };

  @Rule
  public final transient TestPipeline p =
      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());

  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
  private static final Logger logger =
LoggerFactory.getLogger(ReadFilesTest.class);

  private static void printFiles(Path filepath) {
    logger.info("filepath is {}", filepath);
    File folder = new File(filepath.getParent().toString());
    File[] listOfFiles = folder.listFiles();
    logger.info("found {} files", listOfFiles.length);
    for (File file : listOfFiles) {
      logger.info("found {}", file.getPath());
    }
  }

  private static void assertFileContent(Path filepath, String
expected, Duration timeout)
      throws InterruptedException, IOException {
    Instant start = Instant.now();
    long sleepMillis = 100;

    do {
      printFiles(filepath);
      Thread.sleep(sleepMillis);
    } while (Files.notExists(filepath) &&
start.plus(timeout).isAfter(Instant.now()));

    String content = Files.readString(filepath).trim();
    Truth.assertThat(content).isEqualTo(expected);
  }

  @Test
  public void testRunNeverEnds() throws InterruptedException, IOException {
    final Duration duration = Duration.standardDays(1);

    IntervalWindow firstWindow =
        new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
    logger.info("first window {}", firstWindow);
    IntervalWindow secondWindow =
        new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
    logger.info("second window {}", secondWindow);

    TemporaryFolder outputFolder = new TemporaryFolder();
    outputFolder.create();

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(
        () -> {
          ReadFiles.run(
              p,
              tmpFolder.getRoot().getAbsolutePath() + "/*",
              Duration.millis(100),
              Growth.never(),
              Paths.get(outputFolder.getRoot().getAbsolutePath()).resolve("results").toString());
          p.run();
        });

    // This sleep duration might need to be adjusted to make sure the
pipeline is running before we
    // create the input files.
    Thread.sleep(10000);

    Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
    Files.write(firstPath, ImmutableList.of());

    Thread.sleep(1000);

    Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
    Files.write(firstPathComplete, ImmutableList.of());

    Thread.sleep(1000);

    Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
    Files.write(secondPath, ImmutableList.of());

    Thread.sleep(1000);

    Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
    Files.write(secondPathComplete, ImmutableList.of());

    assertFileContent(
        outputFolder
            .getRoot()
            .toPath()
            .resolve(

"results2020-01-01T00:00:00.000Z-2020-01-02T00:00:00.000Z-pane-0-last-00000-of-00001"),
        "my-key,3",
        Duration.standardSeconds(30));
    assertFileContent(
        outputFolder
            .getRoot()
            .toPath()
            .resolve(

"results2020-01-02T00:00:00.000Z-2020-01-03T00:00:00.000Z-pane-0-last-00000-of-00001"),
        "my-key,3",
        Duration.standardSeconds(30));
    executorService.shutdown();
  }
}

The pipeline under test is:

public final class ReadFiles {
  private static final Logger logger = LoggerFactory.getLogger(ReadFiles.class);
  private static final DateTimeFormatter dateTimeFormatter =
      DateTimeFormat.forPattern("yyyy-MM-dd");

  /** Runs the pipeline locally. */
  public static void main(String[] unused) {
    String[] args =
        new String[] {
          "--runner=FlinkRunner", "--flinkMaster=localhost:8082",
"--checkpointingInterval=60000"
        };
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);
    logger.info("running");

    run(pipeline, "/tmp/input/*", Duration.millis(500),
Growth.never(), "/tmp/output/result");

    pipeline.run();
  }

  /**
   * Continuously matches given {@code filepattern} with using {@code
pollInverval} and {@code
   * terminationCondition}. The results are written to {@code
outputFilenamePrefix}.
   *
   * <p>Note that it can be implemented as a {@code PTransform<PBegin, PDone>}.
   */
  @VisibleForTesting
  static void run(
      Pipeline pipeline,
      String filepattern,
      Duration pollInterval,
      TerminationCondition terminationCondition,
      String outputFilenamePrefix) {
    PCollection<KV<String, Long>> output =
        pipeline.apply("GetData", new GetData(filepattern,
pollInterval, terminationCondition));

    output
        .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
        .apply(Sum.longsPerKey())
        .apply(
            "FormatResults",
            MapElements.into(TypeDescriptors.strings())
                .via((KV<String, Long> kv) -> String.format("%s,%s",
kv.getKey(), kv.getValue())))
        .apply(TextIO.write().to(outputFilenamePrefix).withWindowedWrites().withNumShards(1));
  }

  /**
   * Continuously matches given {@code filepattern} with using {@code
pollInverval} and {@code
   * terminationCondition}.
   */
  @VisibleForTesting
  static class GetData extends PTransform<PBegin,
PCollection<KV<String, Long>>> {

    private final String filepattern;
    private final Duration pollInterval;
    private final TerminationCondition terminationCondition;

    GetData(String filepattern, Duration pollInterval,
TerminationCondition terminationCondition) {
      this.filepattern = filepattern;
      this.pollInterval = pollInterval;
      this.terminationCondition = terminationCondition;
    }

    @Override
    public PCollection<KV<String, Long>> expand(PBegin input) {
      final Growth<String, MatchResult.Metadata, MatchResult.Metadata>
stringMetadataStringGrowth =
          Watch.growthOf(new MatchPollFn())
              .withPollInterval(pollInterval)
              .withTerminationPerInput(terminationCondition);
      return input
          .apply("Create filepattern", Create.<String>of(filepattern))
          .apply("Continuously match filepatterns", stringMetadataStringGrowth)
          .apply(Values.create())
          .apply(ParDo.of(new ReadFileFn()));
    }
  }

  /**
   * Outputs hardcoded values or nothing if file name ends with
".completed". The event times and
   * watermarks are unchanged
   */
  private static final class ReadFileFn extends DoFn<Metadata,
KV<String, Long>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Metadata metadata = c.element();
      String filename = metadata.resourceId().toString();
      /*
       * Note that we could have used {@link Filter} transform before
calling ReadFileFn to achieve
       * the same semantics.
       */
      if (filename.endsWith(".complete")) {
        return;
      }
      c.output(KV.of("my-key", 1L));
      c.output(KV.of("my-key", 2L));
    }
  }

  /**
   * Matches input filepattern and outputs matched file {@code
Metadata}. The timestamps of the
   * values outputted are based on the filenames, which are assumed to
be either yyyy-MM-dd or
   * yyyy-MM-dd.complete. The watermark is set to the maximum of all
the outputted event timestamps.
   */
  private static class MatchPollFn extends PollFn<String, Metadata> {
    private static final Logger logger =
LoggerFactory.getLogger(MatchPollFn.class);

    @Override
    public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
element, Context c)
        throws Exception {
      final List<Metadata> metadataList =
          FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata();
      List<TimestampedValue<Metadata>> outputs = new LinkedList<>();
      Instant watermark = Instant.EPOCH;
      for (Metadata metadata : metadataList) {
        String filename = metadata.resourceId().toString();
        final Instant timestamp = getTimestamp(filename);
        outputs.add(TimestampedValue.of(metadata, timestamp));
        if (timestamp.isAfter(watermark)) {
          watermark = timestamp;
        }
      }
      logger.info("outputting watermark {}", watermark);
      return Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);
    }
  }

  /**
   * Returns {@link Instant} based on the {@code filepath}. Filename
is assumed to be either
   * yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is
"yyyy-MM-ddT00:00:00.000Z" for
   * yyyy-MM-dd and "yyyy-MM-ddT00:00:00.000Z"+24h for yyyy-MM-dd.complete.
   */
  public static Instant getTimestamp(String filepath) {
    String filename = Paths.get(filepath).getFileName().toString();
    int index = filename.lastIndexOf(".complete");
    if (index != -1) {
      // In the case it has a suffix, strip it.
      filename = filename.substring(0, index);
    }
    Instant timestamp = Instant.parse(filename, dateTimeFormatter);
    if (index != -1) {
      // In the case it has a suffix i.e. it is complete, fast forward
to the next day.
      return timestamp.plus(Duration.standardDays(1));
    }
    return timestamp;
  }
}

I can see that the Watch class is marked as
@Experimental(Kind.SPLITTABLE_DO_FN), however according to the Compatibility
Matrix <https://beam.apache.org/documentation/runners/capability-matrix/>
Splittable DoFns are supported by Flink.

Do you see an issue with our implementation? I would appreciate your input
and suggestions on how to proceed.

Thank you!


On Fri, Oct 23, 2020 at 12:00 PM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Here is the simplified code that works e2e:
>
> static class GetData extends PTransform<PBegin, PCollection<KV<String, Long>>> {
>
>   private final String filepattern;
>   private final Duration pollInterval;
>   private final TerminationCondition terminationCondition;
>
>   GetData(String filepattern, Duration pollInterval, TerminationCondition terminationCondition) {
>     this.filepattern = filepattern;
>     this.pollInterval = pollInterval;
>     this.terminationCondition = terminationCondition;
>   }
>
>   @Override
>   public PCollection<KV<String, Long>> expand(PBegin input) {
>     final Growth<String, MatchResult.Metadata, MatchResult.Metadata> stringMetadataStringGrowth =
>         Watch.growthOf(new MatchPollFn())
>             .withPollInterval(pollInterval)
>             .withTerminationPerInput(terminationCondition);
>     return input
>         .apply("Create filepattern", Create.<String>of(filepattern))
>         .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>         .apply(Values.create())
>         .apply(ParDo.of(new ReadFileFn()));
>   }
> }
>
> /**
>  * Outputs hardcoded values or nothing if file name ends with ".completed". The event times and
>  * watermarks are unchanged
>  */
> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>   @ProcessElement
>   public void processElement(ProcessContext c) {
>     Metadata metadata = c.element();
>     String filename = metadata.resourceId().toString();
>     if (filename.endsWith(".complete")) {
>       return;
>     }
>     c.output(KV.of("my-key", 1L));
>     c.output(KV.of("my-key", 2L));
>   }
> }
>
> /**
>  * Matches input filepattern and outputs matched file {@code Metadata}. The timestamps of the
>  * values outputted are based on the filenames, which are assumed to be either yyyy-MM-dd or
>  * yyyy-MM-dd.complete. The watermark is set to the maximum of all the outputted event timestamps.
>  */
> private static class MatchPollFn extends PollFn<String, Metadata> {
>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>
>   @Override
>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>       throws Exception {
>     final List<Metadata> metadataList =
>         FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata();
>     List<TimestampedValue<Metadata>> outputs = new LinkedList<>();
>     Instant watermark = Instant.EPOCH;
>     for (Metadata metadata : metadataList) {
>       String filename = metadata.resourceId().toString();
>       final Instant timestamp = getTimestamp(filename);
>       outputs.add(TimestampedValue.of(metadata, timestamp));
>       if (timestamp.isAfter(watermark)) {
>         watermark = timestamp;
>       }
>     }
>     logger.info("outputting watermark {}", watermark);
>     return Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);
>   }
> }
>
> /**
>  * Returns {@link Instant} based on the {@code filepath}. Filename is assumed to be either
>  * yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is "yyyy-MM-ddT00:00:00.000Z" for
>  * yyyy-MM-dd and "yyyy-MM-ddT00:00:00.000Z"+24h for yyyy-MM-dd.complete.
>  */
> public static Instant getTimestamp(String filepath) {
>   String filename = Paths.get(filepath).getFileName().toString();
>   int index = filename.lastIndexOf(".complete");
>   if (index != -1) {
>     // In the case it has a suffix, strip it.
>     filename = filename.substring(0, index);
>   }
>   Instant timestamp = Instant.parse(filename, dateTimeFormatter);
>   if (index != -1) {
>     // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>     return timestamp.plus(Duration.standardDays(1));
>   }
>   return timestamp;
> }
>
>
>
>
> On Mon, Oct 19, 2020 at 9:56 AM Luke Cwik <lc...@google.com> wrote:
>
>> For future reference, what did you have to change to get it to work?
>>
>> On Thu, Oct 15, 2020 at 2:40 PM Piotr Filipiuk <pi...@gmail.com>
>> wrote:
>>
>>> Made it work e2e. Thank you all for the help!
>>>
>>> On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <pi...@gmail.com>
>>> wrote:
>>>
>>>> Got it, thank you for the clarification.
>>>>
>>>> I tried to run the pipeline locally, with the following main (see full
>>>> source code attached):
>>>>
>>>> public static void main(String[] args) {
>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>   logger.info("running");
>>>>
>>>>   PCollection<KV<String, Long>> output =
>>>>       FileProcessing.getData(
>>>>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), Growth.never());
>>>>
>>>>   output
>>>>       .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
>>>>       .apply("LogWindowed", Log.ofElements("testGetData"))
>>>>       .apply(Sum.longsPerKey())
>>>>       .apply(
>>>>           "FormatResults",
>>>>           MapElements.into(TypeDescriptors.strings())
>>>>               .via((KV<String, Long> kv) -> String.format("{},{}", kv.getKey(), kv.getValue())))
>>>>       .apply("LogResults", Log.ofElements("results"))
>>>>       .apply(
>>>>           TextIO.write()
>>>>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>>>>               .withWindowedWrites()
>>>>               .withNumShards(1));
>>>>
>>>>   pipeline.run();
>>>> }
>>>>
>>>>
>>>> Then I am generating files using:
>>>>
>>>> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
>>>> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
>>>> 2; done
>>>>
>>>> I do not see any outputs being generated though. Can you elaborate why
>>>> that might be? I would suspect that once the watermark is set to day+1, the
>>>> results of the previous day should be finalized and hence the result for a
>>>> given window should be outputted.
>>>>
>>>> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I think you should be using the largest "complete" timestamp from the
>>>>> metadata results and not be setting the watermark if you don't have one.
>>>>>
>>>>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <
>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>
>>>>>> Thank you so much for the input, that was extremely helpful!
>>>>>>
>>>>>> I changed the pipeline from using FileIO.match() into using a custom
>>>>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>>>>
>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>     Pipeline pipeline,
>>>>>>     String filepattern,
>>>>>>     Duration pollInterval,
>>>>>>     TerminationCondition terminationCondition) {
>>>>>>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>>>>>>       Watch.growthOf(
>>>>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>>>>>>           .withPollInterval(pollInterval)
>>>>>>           .withTerminationPerInput(terminationCondition);
>>>>>>   return pipeline
>>>>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>>>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>>>>>       .apply(Values.create())
>>>>>>       .apply(ParDo.of(new ReadFileFn()));
>>>>>> }
>>>>>>
>>>>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>>>>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>>>>>>
>>>>>>   @Override
>>>>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>>>>>>       throws Exception {
>>>>>>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>>>>>>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>>>>>>     // watermark.
>>>>>>     Instant instant = Instant.EPOCH;
>>>>>>     return Watch.Growth.PollResult.incomplete(
>>>>>>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>>>>>>         .withWatermark(instant);
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>>>>>>   @Override
>>>>>>   public String apply(MatchResult.Metadata input) {
>>>>>>     return input.resourceId().toString();
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> The above together with fixing the bugs that Luke pointed out (Thank
>>>>>> you Luke!), makes the unit test pass.
>>>>>>
>>>>>> Thank you again!
>>>>>>
>>>>>> If you have any feedback for the current code, I would appreciate it.
>>>>>> I am especially interested whether setting event time and watermark in
>>>>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> FYI this is a major limitation in FileIO.match's watermarking
>>>>>>> ability. I believe there is a JIRA issue about this, but nobody has ever
>>>>>>> worked on improving it.
>>>>>>>
>>>>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> FileIO.match doesn't allow one to configure how the watermark
>>>>>>>> advances and it assumes that the watermark during polling is always the
>>>>>>>> current system time[1].
>>>>>>>>
>>>>>>>> Because of this the downstream watermark advancement is limited.
>>>>>>>> When an element and restriction starts processing, the maximum you can hold
>>>>>>>> the output watermark back by for this element and restriction pair is
>>>>>>>> limited to the current input watermark (a common value to use is the
>>>>>>>> current element's timestamp as the lower bound for all future output but if
>>>>>>>> that element is late the output you produce may or may not be late (depends
>>>>>>>> on downstream windowing strategy)). Holding this watermark back is
>>>>>>>> important since many of these elements and restrictions could be processed
>>>>>>>> in parallel at different rates.
>>>>>>>>
>>>>>>>> Based upon your implementation, you wouldn't need to control the
>>>>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>>>>>> to say what the watermark is after each polling round and allowed you to
>>>>>>>> set the timestamp for each match found. This initial setting of the
>>>>>>>> watermark during polling would be properly handled by the runner to block
>>>>>>>> watermark advancement for those elements.
>>>>>>>>
>>>>>>>> Minor comments not related to your issue but would improve your
>>>>>>>> implementation:
>>>>>>>> 1) Typically you set the watermark right before returning. You are
>>>>>>>> missing this from the failed tryClaim loop return.
>>>>>>>> 2) You should structure your loop not based upon the end of the
>>>>>>>> current restriction but continue processing till tryClaim fails. For
>>>>>>>> example:
>>>>>>>>       @ProcessElement
>>>>>>>>       public void processElement(@Element String fileName,
>>>>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>>>>> outputReceiver) throws IOException {
>>>>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>>>>> tracker.currentRestriction().getFrom());
>>>>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>>>>         }
>>>>>>>>       }
>>>>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>>>>> possible data issue since the code either parsed some filename incorrectly.
>>>>>>>> It is likely that you copied this from Beam code and it is used there
>>>>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>>>>
>>>>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made some
>>>>>>>>> progress.
>>>>>>>>>
>>>>>>>>> I created a simple pipeline that matches given filepattern, and
>>>>>>>>> uses splittable dofn to control event times and watermarks. The pipeline
>>>>>>>>> expects files with the following name patterns:
>>>>>>>>>
>>>>>>>>> *yyyy-MM-dd*
>>>>>>>>>
>>>>>>>>> *yyyy-MM-dd.complete*
>>>>>>>>>
>>>>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and
>>>>>>>>> outputs lines of the file using *outputWithTimestamp(...,
>>>>>>>>> timestamp)*. Additionally, it calls
>>>>>>>>> *watermarkEstimator.setWatermark(timestamp)*. In both cases the
>>>>>>>>> timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>>>>
>>>>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty)
>>>>>>>>> it calls *watermarkEstimator.setWatermark(timestamp)*, where
>>>>>>>>> timestamp is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it
>>>>>>>>> advances to the next day.
>>>>>>>>>
>>>>>>>>> I am at the point when the following unit test fails the
>>>>>>>>> inWindow() assertions, the last assertion passes. It seems that
>>>>>>>>> even though I call watermarkEstimator.setWatermark() the window is not
>>>>>>>>> being closed.
>>>>>>>>>
>>>>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>>>>
>>>>>>>>> Here is a unit test. The function being tested is getData()
>>>>>>>>> defined below.
>>>>>>>>>
>>>>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>>>>
>>>>>>>>>   IntervalWindow firstWindow =
>>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>>>>   IntervalWindow secondWindow =
>>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>>>>
>>>>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>>>>           .continuously(
>>>>>>>>>               Duration.millis(100),
>>>>>>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>>>>
>>>>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>>>>
>>>>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>>>>
>>>>>>>>>   Thread writer =
>>>>>>>>>       new Thread(
>>>>>>>>>           () -> {
>>>>>>>>>             try {
>>>>>>>>>               Thread.sleep(1000);
>>>>>>>>>
>>>>>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>>>>
>>>>>>>>>               Thread.sleep(1000);
>>>>>>>>>
>>>>>>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>>>>
>>>>>>>>>               Thread.sleep(1000);
>>>>>>>>>
>>>>>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>>>>
>>>>>>>>>               Thread.sleep(1000);
>>>>>>>>>
>>>>>>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>>>>
>>>>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>>>>               throw new RuntimeException(e);
>>>>>>>>>             }
>>>>>>>>>           });
>>>>>>>>>   writer.start();
>>>>>>>>>
>>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>>   PAssert.that(output)
>>>>>>>>>       .inWindow(firstWindow)
>>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>>>>>>
>>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>>   PAssert.that(output)
>>>>>>>>>       .inWindow(secondWindow)
>>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>>>>>>
>>>>>>>>>   // THIS ASSERTION PASSES.
>>>>>>>>>   PAssert.that(output)
>>>>>>>>>       .containsInAnyOrder(
>>>>>>>>>           KV.of("my-key", 1L),
>>>>>>>>>           KV.of("my-key", 2L),
>>>>>>>>>           KV.of("my-key", 3L),
>>>>>>>>>           KV.of("my-key", 4L),
>>>>>>>>>           KV.of("my-key", 5L),
>>>>>>>>>           KV.of("my-key", 6L));
>>>>>>>>>
>>>>>>>>>   p.run();
>>>>>>>>>
>>>>>>>>>   writer.join();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to
>>>>>>>>> match filepattern. Then the file *Metadata* is processed by my
>>>>>>>>> custom Splittable DoFn.
>>>>>>>>>
>>>>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>>>>>>   PCollection<Metadata> matches =
>>>>>>>>>       pipeline.apply(
>>>>>>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>>>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>>>>>>  * Long per line, IOException is thrown.
>>>>>>>>>  */
>>>>>>>>> @DoFn.BoundedPerElement
>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>
>>>>>>>>>   @ProcessElement
>>>>>>>>>   public void processElement(
>>>>>>>>>       ProcessContext c,
>>>>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>>>>       throws IOException {
>>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>>     logger.info(
>>>>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>>>>         metadata,
>>>>>>>>>         tracker.currentRestriction(),
>>>>>>>>>         c.timestamp());
>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>>       String line;
>>>>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>>>>           continue;
>>>>>>>>>         }
>>>>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>>>>           return;
>>>>>>>>>         }
>>>>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>>>>     if (index != -1) {
>>>>>>>>>       // In the case it has a suffix, strip it.
>>>>>>>>>       filename = filename.substring(0, index);
>>>>>>>>>     }
>>>>>>>>>     Instant timestamp =
>>>>>>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>>>>     if (index != -1) {
>>>>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>>>>     }
>>>>>>>>>     return timestamp;
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   @GetInitialRestriction
>>>>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>>>>>>     long lineCount;
>>>>>>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>>>>       lineCount = stream.count();
>>>>>>>>>     }
>>>>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>>>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>>>>>>     // state.
>>>>>>>>>     return getTimestamp(filename);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>>>>     }
>>>>>>>>>     return timestamp;
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   @NewWatermarkEstimator
>>>>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>>>>> this topic.
>>>>>>>>>>
>>>>>>>>>> The TLDR; is that FileIO.match() should allow users to control
>>>>>>>>>> the watermark estimator that is used and for your use case you should hold
>>>>>>>>>> the watermark to some computable value (e.g. the files are generated every
>>>>>>>>>> hour so once you know the last file has appeared for that hour you advance
>>>>>>>>>> the watermark to the current hour).
>>>>>>>>>>
>>>>>>>>>> 1:
>>>>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am looking into:
>>>>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>>>>>>> the file. For example:
>>>>>>>>>>>
>>>>>>>>>>> private static void run(String[] args) {
>>>>>>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>>
>>>>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>>>>       .apply(FileIO.match()
>>>>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>>>>>>   matches
>>>>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>>>>
>>>>>>>>>>>   pipeline.run();
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>>>
>>>>>>>>>>>   @ProcessElement
>>>>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>>>>       String line;
>>>>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>>>>       }
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am
>>>>>>>>>>> getting:
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output
>>>>>>>>>>> with timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no
>>>>>>>>>>> earlier than the timestamp of the current input (2020-10-08T20:39:44.286Z)
>>>>>>>>>>> minus the allowed skew (0 milliseconds). See the
>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed
>>>>>>>>>>> skew.
>>>>>>>>>>>
>>>>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>>>>> the event time for the PCollection<Metadata>. I can see that
>>>>>>>>>>> the call to continuously() makes the PCollection unbounded and
>>>>>>>>>>> assigns default Event Time. Without the call to continuously() I can assign
>>>>>>>>>>> the timestamps without problems either via c.outputWithTimestamp
>>>>>>>>>>> or WithTimestamp transform.
>>>>>>>>>>>
>>>>>>>>>>> I would like to know what is the way to fix the issue, and
>>>>>>>>>>> whether this use-case is currently supported in Beam.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Piotr
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Piotr
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>


-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Here is the simplified code that works e2e:

static class GetData extends PTransform<PBegin, PCollection<KV<String, Long>>> {

  private final String filepattern;
  private final Duration pollInterval;
  private final TerminationCondition terminationCondition;

  GetData(String filepattern, Duration pollInterval,
TerminationCondition terminationCondition) {
    this.filepattern = filepattern;
    this.pollInterval = pollInterval;
    this.terminationCondition = terminationCondition;
  }

  @Override
  public PCollection<KV<String, Long>> expand(PBegin input) {
    final Growth<String, MatchResult.Metadata, MatchResult.Metadata>
stringMetadataStringGrowth =
        Watch.growthOf(new MatchPollFn())
            .withPollInterval(pollInterval)
            .withTerminationPerInput(terminationCondition);
    return input
        .apply("Create filepattern", Create.<String>of(filepattern))
        .apply("Continuously match filepatterns", stringMetadataStringGrowth)
        .apply(Values.create())
        .apply(ParDo.of(new ReadFileFn()));
  }
}

/**
 * Outputs hardcoded values or nothing if file name ends with
".completed". The event times and
 * watermarks are unchanged
 */
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    Metadata metadata = c.element();
    String filename = metadata.resourceId().toString();
    if (filename.endsWith(".complete")) {
      return;
    }
    c.output(KV.of("my-key", 1L));
    c.output(KV.of("my-key", 2L));
  }
}

/**
 * Matches input filepattern and outputs matched file {@code
Metadata}. The timestamps of the
 * values outputted are based on the filenames, which are assumed to
be either yyyy-MM-dd or
 * yyyy-MM-dd.complete. The watermark is set to the maximum of all the
outputted event timestamps.
 */
private static class MatchPollFn extends PollFn<String, Metadata> {
  private static final Logger logger =
LoggerFactory.getLogger(MatchPollFn.class);

  @Override
  public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
element, Context c)
      throws Exception {
    final List<Metadata> metadataList =
        FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata();
    List<TimestampedValue<Metadata>> outputs = new LinkedList<>();
    Instant watermark = Instant.EPOCH;
    for (Metadata metadata : metadataList) {
      String filename = metadata.resourceId().toString();
      final Instant timestamp = getTimestamp(filename);
      outputs.add(TimestampedValue.of(metadata, timestamp));
      if (timestamp.isAfter(watermark)) {
        watermark = timestamp;
      }
    }
    logger.info("outputting watermark {}", watermark);
    return Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);
  }
}

/**
 * Returns {@link Instant} based on the {@code filepath}. Filename is
assumed to be either
 * yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is
"yyyy-MM-ddT00:00:00.000Z" for
 * yyyy-MM-dd and "yyyy-MM-ddT00:00:00.000Z"+24h for yyyy-MM-dd.complete.
 */
public static Instant getTimestamp(String filepath) {
  String filename = Paths.get(filepath).getFileName().toString();
  int index = filename.lastIndexOf(".complete");
  if (index != -1) {
    // In the case it has a suffix, strip it.
    filename = filename.substring(0, index);
  }
  Instant timestamp = Instant.parse(filename, dateTimeFormatter);
  if (index != -1) {
    // In the case it has a suffix i.e. it is complete, fast forward
to the next day.
    return timestamp.plus(Duration.standardDays(1));
  }
  return timestamp;
}




On Mon, Oct 19, 2020 at 9:56 AM Luke Cwik <lc...@google.com> wrote:

> For future reference, what did you have to change to get it to work?
>
> On Thu, Oct 15, 2020 at 2:40 PM Piotr Filipiuk <pi...@gmail.com>
> wrote:
>
>> Made it work e2e. Thank you all for the help!
>>
>> On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <pi...@gmail.com>
>> wrote:
>>
>>> Got it, thank you for the clarification.
>>>
>>> I tried to run the pipeline locally, with the following main (see full
>>> source code attached):
>>>
>>> public static void main(String[] args) {
>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>   Pipeline pipeline = Pipeline.create(options);
>>>   logger.info("running");
>>>
>>>   PCollection<KV<String, Long>> output =
>>>       FileProcessing.getData(
>>>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), Growth.never());
>>>
>>>   output
>>>       .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
>>>       .apply("LogWindowed", Log.ofElements("testGetData"))
>>>       .apply(Sum.longsPerKey())
>>>       .apply(
>>>           "FormatResults",
>>>           MapElements.into(TypeDescriptors.strings())
>>>               .via((KV<String, Long> kv) -> String.format("{},{}", kv.getKey(), kv.getValue())))
>>>       .apply("LogResults", Log.ofElements("results"))
>>>       .apply(
>>>           TextIO.write()
>>>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>>>               .withWindowedWrites()
>>>               .withNumShards(1));
>>>
>>>   pipeline.run();
>>> }
>>>
>>>
>>> Then I am generating files using:
>>>
>>> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
>>> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
>>> 2; done
>>>
>>> I do not see any outputs being generated though. Can you elaborate why
>>> that might be? I would suspect that once the watermark is set to day+1, the
>>> results of the previous day should be finalized and hence the result for a
>>> given window should be outputted.
>>>
>>> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I think you should be using the largest "complete" timestamp from the
>>>> metadata results and not be setting the watermark if you don't have one.
>>>>
>>>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <
>>>> piotr.filipiuk@gmail.com> wrote:
>>>>
>>>>> Thank you so much for the input, that was extremely helpful!
>>>>>
>>>>> I changed the pipeline from using FileIO.match() into using a custom
>>>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>>     Pipeline pipeline,
>>>>>     String filepattern,
>>>>>     Duration pollInterval,
>>>>>     TerminationCondition terminationCondition) {
>>>>>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>>>>>       Watch.growthOf(
>>>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>>>>>           .withPollInterval(pollInterval)
>>>>>           .withTerminationPerInput(terminationCondition);
>>>>>   return pipeline
>>>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>>>>       .apply(Values.create())
>>>>>       .apply(ParDo.of(new ReadFileFn()));
>>>>> }
>>>>>
>>>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>>>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>>>>>
>>>>>   @Override
>>>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>>>>>       throws Exception {
>>>>>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>>>>>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>>>>>     // watermark.
>>>>>     Instant instant = Instant.EPOCH;
>>>>>     return Watch.Growth.PollResult.incomplete(
>>>>>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>>>>>         .withWatermark(instant);
>>>>>   }
>>>>> }
>>>>>
>>>>> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>>>>>   @Override
>>>>>   public String apply(MatchResult.Metadata input) {
>>>>>     return input.resourceId().toString();
>>>>>   }
>>>>> }
>>>>>
>>>>> The above together with fixing the bugs that Luke pointed out (Thank
>>>>> you Luke!), makes the unit test pass.
>>>>>
>>>>> Thank you again!
>>>>>
>>>>> If you have any feedback for the current code, I would appreciate it.
>>>>> I am especially interested whether setting event time and watermark in
>>>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>>>
>>>>>
>>>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> FYI this is a major limitation in FileIO.match's watermarking
>>>>>> ability. I believe there is a JIRA issue about this, but nobody has ever
>>>>>> worked on improving it.
>>>>>>
>>>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> FileIO.match doesn't allow one to configure how the watermark
>>>>>>> advances and it assumes that the watermark during polling is always the
>>>>>>> current system time[1].
>>>>>>>
>>>>>>> Because of this the downstream watermark advancement is limited.
>>>>>>> When an element and restriction starts processing, the maximum you can hold
>>>>>>> the output watermark back by for this element and restriction pair is
>>>>>>> limited to the current input watermark (a common value to use is the
>>>>>>> current element's timestamp as the lower bound for all future output but if
>>>>>>> that element is late the output you produce may or may not be late (depends
>>>>>>> on downstream windowing strategy)). Holding this watermark back is
>>>>>>> important since many of these elements and restrictions could be processed
>>>>>>> in parallel at different rates.
>>>>>>>
>>>>>>> Based upon your implementation, you wouldn't need to control the
>>>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>>>>> to say what the watermark is after each polling round and allowed you to
>>>>>>> set the timestamp for each match found. This initial setting of the
>>>>>>> watermark during polling would be properly handled by the runner to block
>>>>>>> watermark advancement for those elements.
>>>>>>>
>>>>>>> Minor comments not related to your issue but would improve your
>>>>>>> implementation:
>>>>>>> 1) Typically you set the watermark right before returning. You are
>>>>>>> missing this from the failed tryClaim loop return.
>>>>>>> 2) You should structure your loop not based upon the end of the
>>>>>>> current restriction but continue processing till tryClaim fails. For
>>>>>>> example:
>>>>>>>       @ProcessElement
>>>>>>>       public void processElement(@Element String fileName,
>>>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>>>> outputReceiver) throws IOException {
>>>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>>>> tracker.currentRestriction().getFrom());
>>>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>>>         }
>>>>>>>       }
>>>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>>>> possible data issue since the code either parsed some filename incorrectly.
>>>>>>> It is likely that you copied this from Beam code and it is used there
>>>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>>>
>>>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made some
>>>>>>>> progress.
>>>>>>>>
>>>>>>>> I created a simple pipeline that matches given filepattern, and
>>>>>>>> uses splittable dofn to control event times and watermarks. The pipeline
>>>>>>>> expects files with the following name patterns:
>>>>>>>>
>>>>>>>> *yyyy-MM-dd*
>>>>>>>>
>>>>>>>> *yyyy-MM-dd.complete*
>>>>>>>>
>>>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>>>
>>>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty)
>>>>>>>> it calls *watermarkEstimator.setWatermark(timestamp)*, where
>>>>>>>> timestamp is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it
>>>>>>>> advances to the next day.
>>>>>>>>
>>>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>>>
>>>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>>>
>>>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>>>> below.
>>>>>>>>
>>>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>>>
>>>>>>>>   IntervalWindow firstWindow =
>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>>>   IntervalWindow secondWindow =
>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>>>
>>>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>>>           .continuously(
>>>>>>>>               Duration.millis(100),
>>>>>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>>>
>>>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>>>
>>>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>>>
>>>>>>>>   Thread writer =
>>>>>>>>       new Thread(
>>>>>>>>           () -> {
>>>>>>>>             try {
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>>>               throw new RuntimeException(e);
>>>>>>>>             }
>>>>>>>>           });
>>>>>>>>   writer.start();
>>>>>>>>
>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .inWindow(firstWindow)
>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>>>>>
>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .inWindow(secondWindow)
>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>>   // THIS ASSERTION PASSES.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .containsInAnyOrder(
>>>>>>>>           KV.of("my-key", 1L),
>>>>>>>>           KV.of("my-key", 2L),
>>>>>>>>           KV.of("my-key", 3L),
>>>>>>>>           KV.of("my-key", 4L),
>>>>>>>>           KV.of("my-key", 5L),
>>>>>>>>           KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>>   p.run();
>>>>>>>>
>>>>>>>>   writer.join();
>>>>>>>> }
>>>>>>>>
>>>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to
>>>>>>>> match filepattern. Then the file *Metadata* is processed by my
>>>>>>>> custom Splittable DoFn.
>>>>>>>>
>>>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>>>>>   PCollection<Metadata> matches =
>>>>>>>>       pipeline.apply(
>>>>>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>>>> }
>>>>>>>>
>>>>>>>> /**
>>>>>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>>>>>  * Long per line, IOException is thrown.
>>>>>>>>  */
>>>>>>>> @DoFn.BoundedPerElement
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>>   @ProcessElement
>>>>>>>>   public void processElement(
>>>>>>>>       ProcessContext c,
>>>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>>>       throws IOException {
>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>     logger.info(
>>>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>>>         metadata,
>>>>>>>>         tracker.currentRestriction(),
>>>>>>>>         c.timestamp());
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>       String line;
>>>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>>>           continue;
>>>>>>>>         }
>>>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>>>           return;
>>>>>>>>         }
>>>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>>>     if (index != -1) {
>>>>>>>>       // In the case it has a suffix, strip it.
>>>>>>>>       filename = filename.substring(0, index);
>>>>>>>>     }
>>>>>>>>     Instant timestamp =
>>>>>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>>>     if (index != -1) {
>>>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>>>     }
>>>>>>>>     return timestamp;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @GetInitialRestriction
>>>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>>>>>     long lineCount;
>>>>>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>>>       lineCount = stream.count();
>>>>>>>>     }
>>>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>>>>>     // state.
>>>>>>>>     return getTimestamp(filename);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>>>     }
>>>>>>>>     return timestamp;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @NewWatermarkEstimator
>>>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>>>> this topic.
>>>>>>>>>
>>>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>>>> watermark estimator that is used and for your use case you should hold the
>>>>>>>>> watermark to some computable value (e.g. the files are generated every hour
>>>>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>>>>> watermark to the current hour).
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am looking into:
>>>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>>>>>> the file. For example:
>>>>>>>>>>
>>>>>>>>>> private static void run(String[] args) {
>>>>>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>
>>>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>>>       .apply(FileIO.match()
>>>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>>>>>   matches
>>>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>>>
>>>>>>>>>>   pipeline.run();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>>
>>>>>>>>>>   @ProcessElement
>>>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>>>       String line;
>>>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am
>>>>>>>>>> getting:
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>>>
>>>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>>>> call to continuously() makes the PCollection unbounded and
>>>>>>>>>> assigns default Event Time. Without the call to continuously() I can assign
>>>>>>>>>> the timestamps without problems either via c.outputWithTimestamp
>>>>>>>>>> or WithTimestamp transform.
>>>>>>>>>>
>>>>>>>>>> I would like to know what is the way to fix the issue, and
>>>>>>>>>> whether this use-case is currently supported in Beam.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotr
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Luke Cwik <lc...@google.com>.
For future reference, what did you have to change to get it to work?

On Thu, Oct 15, 2020 at 2:40 PM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Made it work e2e. Thank you all for the help!
>
> On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <pi...@gmail.com>
> wrote:
>
>> Got it, thank you for the clarification.
>>
>> I tried to run the pipeline locally, with the following main (see full
>> source code attached):
>>
>> public static void main(String[] args) {
>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>   Pipeline pipeline = Pipeline.create(options);
>>   logger.info("running");
>>
>>   PCollection<KV<String, Long>> output =
>>       FileProcessing.getData(
>>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), Growth.never());
>>
>>   output
>>       .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
>>       .apply("LogWindowed", Log.ofElements("testGetData"))
>>       .apply(Sum.longsPerKey())
>>       .apply(
>>           "FormatResults",
>>           MapElements.into(TypeDescriptors.strings())
>>               .via((KV<String, Long> kv) -> String.format("{},{}", kv.getKey(), kv.getValue())))
>>       .apply("LogResults", Log.ofElements("results"))
>>       .apply(
>>           TextIO.write()
>>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>>               .withWindowedWrites()
>>               .withNumShards(1));
>>
>>   pipeline.run();
>> }
>>
>>
>> Then I am generating files using:
>>
>> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
>> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
>> 2; done
>>
>> I do not see any outputs being generated though. Can you elaborate why
>> that might be? I would suspect that once the watermark is set to day+1, the
>> results of the previous day should be finalized and hence the result for a
>> given window should be outputted.
>>
>> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I think you should be using the largest "complete" timestamp from the
>>> metadata results and not be setting the watermark if you don't have one.
>>>
>>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <
>>> piotr.filipiuk@gmail.com> wrote:
>>>
>>>> Thank you so much for the input, that was extremely helpful!
>>>>
>>>> I changed the pipeline from using FileIO.match() into using a custom
>>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>>
>>>> static PCollection<KV<String, Long>> getData(
>>>>     Pipeline pipeline,
>>>>     String filepattern,
>>>>     Duration pollInterval,
>>>>     TerminationCondition terminationCondition) {
>>>>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>>>>       Watch.growthOf(
>>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>>>>           .withPollInterval(pollInterval)
>>>>           .withTerminationPerInput(terminationCondition);
>>>>   return pipeline
>>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>>>       .apply(Values.create())
>>>>       .apply(ParDo.of(new ReadFileFn()));
>>>> }
>>>>
>>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>>>>
>>>>   @Override
>>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>>>>       throws Exception {
>>>>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>>>>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>>>>     // watermark.
>>>>     Instant instant = Instant.EPOCH;
>>>>     return Watch.Growth.PollResult.incomplete(
>>>>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>>>>         .withWatermark(instant);
>>>>   }
>>>> }
>>>>
>>>> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>>>>   @Override
>>>>   public String apply(MatchResult.Metadata input) {
>>>>     return input.resourceId().toString();
>>>>   }
>>>> }
>>>>
>>>> The above together with fixing the bugs that Luke pointed out (Thank
>>>> you Luke!), makes the unit test pass.
>>>>
>>>> Thank you again!
>>>>
>>>> If you have any feedback for the current code, I would appreciate it. I
>>>> am especially interested whether setting event time and watermark in
>>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>>
>>>>
>>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> FYI this is a major limitation in FileIO.match's watermarking ability.
>>>>> I believe there is a JIRA issue about this, but nobody has ever worked on
>>>>> improving it.
>>>>>
>>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> FileIO.match doesn't allow one to configure how the watermark
>>>>>> advances and it assumes that the watermark during polling is always the
>>>>>> current system time[1].
>>>>>>
>>>>>> Because of this the downstream watermark advancement is limited. When
>>>>>> an element and restriction starts processing, the maximum you can hold the
>>>>>> output watermark back by for this element and restriction pair is limited
>>>>>> to the current input watermark (a common value to use is the current
>>>>>> element's timestamp as the lower bound for all future output but if that
>>>>>> element is late the output you produce may or may not be late (depends on
>>>>>> downstream windowing strategy)). Holding this watermark back is important
>>>>>> since many of these elements and restrictions could be processed in
>>>>>> parallel at different rates.
>>>>>>
>>>>>> Based upon your implementation, you wouldn't need to control the
>>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>>>> to say what the watermark is after each polling round and allowed you to
>>>>>> set the timestamp for each match found. This initial setting of the
>>>>>> watermark during polling would be properly handled by the runner to block
>>>>>> watermark advancement for those elements.
>>>>>>
>>>>>> Minor comments not related to your issue but would improve your
>>>>>> implementation:
>>>>>> 1) Typically you set the watermark right before returning. You are
>>>>>> missing this from the failed tryClaim loop return.
>>>>>> 2) You should structure your loop not based upon the end of the
>>>>>> current restriction but continue processing till tryClaim fails. For
>>>>>> example:
>>>>>>       @ProcessElement
>>>>>>       public void processElement(@Element String fileName,
>>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>>> outputReceiver) throws IOException {
>>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>>> tracker.currentRestriction().getFrom());
>>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>>         }
>>>>>>       }
>>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>>> possible data issue since the code either parsed some filename incorrectly.
>>>>>> It is likely that you copied this from Beam code and it is used there
>>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>>
>>>>>> 1:
>>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>>
>>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made some
>>>>>>> progress.
>>>>>>>
>>>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>>>> splittable dofn to control event times and watermarks. The pipeline expects
>>>>>>> files with the following name patterns:
>>>>>>>
>>>>>>> *yyyy-MM-dd*
>>>>>>>
>>>>>>> *yyyy-MM-dd.complete*
>>>>>>>
>>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>>
>>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to
>>>>>>> the next day.
>>>>>>>
>>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>>
>>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>>
>>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>>> below.
>>>>>>>
>>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>>
>>>>>>>   IntervalWindow firstWindow =
>>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>>   IntervalWindow secondWindow =
>>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>>
>>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>>           .continuously(
>>>>>>>               Duration.millis(100),
>>>>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>>
>>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>>
>>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>>
>>>>>>>   Thread writer =
>>>>>>>       new Thread(
>>>>>>>           () -> {
>>>>>>>             try {
>>>>>>>               Thread.sleep(1000);
>>>>>>>
>>>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>>
>>>>>>>               Thread.sleep(1000);
>>>>>>>
>>>>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>>
>>>>>>>               Thread.sleep(1000);
>>>>>>>
>>>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>>
>>>>>>>               Thread.sleep(1000);
>>>>>>>
>>>>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>>
>>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>>               throw new RuntimeException(e);
>>>>>>>             }
>>>>>>>           });
>>>>>>>   writer.start();
>>>>>>>
>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>   PAssert.that(output)
>>>>>>>       .inWindow(firstWindow)
>>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>>>>
>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>   PAssert.that(output)
>>>>>>>       .inWindow(secondWindow)
>>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>>>>
>>>>>>>   // THIS ASSERTION PASSES.
>>>>>>>   PAssert.that(output)
>>>>>>>       .containsInAnyOrder(
>>>>>>>           KV.of("my-key", 1L),
>>>>>>>           KV.of("my-key", 2L),
>>>>>>>           KV.of("my-key", 3L),
>>>>>>>           KV.of("my-key", 4L),
>>>>>>>           KV.of("my-key", 5L),
>>>>>>>           KV.of("my-key", 6L));
>>>>>>>
>>>>>>>   p.run();
>>>>>>>
>>>>>>>   writer.join();
>>>>>>> }
>>>>>>>
>>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>>>> Splittable DoFn.
>>>>>>>
>>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>>>>   PCollection<Metadata> matches =
>>>>>>>       pipeline.apply(
>>>>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>>>>  * Long per line, IOException is thrown.
>>>>>>>  */
>>>>>>> @DoFn.BoundedPerElement
>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>
>>>>>>>   @ProcessElement
>>>>>>>   public void processElement(
>>>>>>>       ProcessContext c,
>>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>>       throws IOException {
>>>>>>>     Metadata metadata = c.element();
>>>>>>>     logger.info(
>>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>>         metadata,
>>>>>>>         tracker.currentRestriction(),
>>>>>>>         c.timestamp());
>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>       String line;
>>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>>           continue;
>>>>>>>         }
>>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>>           return;
>>>>>>>         }
>>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>>>>       }
>>>>>>>     }
>>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>>>>   }
>>>>>>>
>>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>>     if (index != -1) {
>>>>>>>       // In the case it has a suffix, strip it.
>>>>>>>       filename = filename.substring(0, index);
>>>>>>>     }
>>>>>>>     Instant timestamp =
>>>>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>>     if (index != -1) {
>>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>>     }
>>>>>>>     return timestamp;
>>>>>>>   }
>>>>>>>
>>>>>>>   @GetInitialRestriction
>>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>>>>     long lineCount;
>>>>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>>       lineCount = stream.count();
>>>>>>>     }
>>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>>   }
>>>>>>>
>>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>>>>     // state.
>>>>>>>     return getTimestamp(filename);
>>>>>>>   }
>>>>>>>
>>>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>>     }
>>>>>>>     return timestamp;
>>>>>>>   }
>>>>>>>
>>>>>>>   @NewWatermarkEstimator
>>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>>> this topic.
>>>>>>>>
>>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>>> watermark estimator that is used and for your use case you should hold the
>>>>>>>> watermark to some computable value (e.g. the files are generated every hour
>>>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>>>> watermark to the current hour).
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking into:
>>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>>>>> the file. For example:
>>>>>>>>>
>>>>>>>>> private static void run(String[] args) {
>>>>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>
>>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>>       .apply(FileIO.match()
>>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>>>>   matches
>>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>>
>>>>>>>>>   pipeline.run();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>
>>>>>>>>>   @ProcessElement
>>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>>       String line;
>>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am
>>>>>>>>> getting:
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>>
>>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>>> call to continuously() makes the PCollection unbounded and
>>>>>>>>> assigns default Event Time. Without the call to continuously() I can assign
>>>>>>>>> the timestamps without problems either via c.outputWithTimestamp
>>>>>>>>> or WithTimestamp transform.
>>>>>>>>>
>>>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>>>> this use-case is currently supported in Beam.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Piotr
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
>
> --
> Best regards,
> Piotr
>

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Made it work e2e. Thank you all for the help!

On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Got it, thank you for the clarification.
>
> I tried to run the pipeline locally, with the following main (see full
> source code attached):
>
> public static void main(String[] args) {
>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>   Pipeline pipeline = Pipeline.create(options);
>   logger.info("running");
>
>   PCollection<KV<String, Long>> output =
>       FileProcessing.getData(
>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), Growth.never());
>
>   output
>       .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
>       .apply("LogWindowed", Log.ofElements("testGetData"))
>       .apply(Sum.longsPerKey())
>       .apply(
>           "FormatResults",
>           MapElements.into(TypeDescriptors.strings())
>               .via((KV<String, Long> kv) -> String.format("{},{}", kv.getKey(), kv.getValue())))
>       .apply("LogResults", Log.ofElements("results"))
>       .apply(
>           TextIO.write()
>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>               .withWindowedWrites()
>               .withNumShards(1));
>
>   pipeline.run();
> }
>
>
> Then I am generating files using:
>
> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
> 2; done
>
> I do not see any outputs being generated though. Can you elaborate why
> that might be? I would suspect that once the watermark is set to day+1, the
> results of the previous day should be finalized and hence the result for a
> given window should be outputted.
>
> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <lc...@google.com> wrote:
>
>> I think you should be using the largest "complete" timestamp from the
>> metadata results and not be setting the watermark if you don't have one.
>>
>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <pi...@gmail.com>
>> wrote:
>>
>>> Thank you so much for the input, that was extremely helpful!
>>>
>>> I changed the pipeline from using FileIO.match() into using a custom
>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>
>>> static PCollection<KV<String, Long>> getData(
>>>     Pipeline pipeline,
>>>     String filepattern,
>>>     Duration pollInterval,
>>>     TerminationCondition terminationCondition) {
>>>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>>>       Watch.growthOf(
>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>>>           .withPollInterval(pollInterval)
>>>           .withTerminationPerInput(terminationCondition);
>>>   return pipeline
>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>>       .apply(Values.create())
>>>       .apply(ParDo.of(new ReadFileFn()));
>>> }
>>>
>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>>>
>>>   @Override
>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>>>       throws Exception {
>>>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>>>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>>>     // watermark.
>>>     Instant instant = Instant.EPOCH;
>>>     return Watch.Growth.PollResult.incomplete(
>>>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>>>         .withWatermark(instant);
>>>   }
>>> }
>>>
>>> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>>>   @Override
>>>   public String apply(MatchResult.Metadata input) {
>>>     return input.resourceId().toString();
>>>   }
>>> }
>>>
>>> The above together with fixing the bugs that Luke pointed out (Thank you
>>> Luke!), makes the unit test pass.
>>>
>>> Thank you again!
>>>
>>> If you have any feedback for the current code, I would appreciate it. I
>>> am especially interested whether setting event time and watermark in
>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>
>>>
>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> FYI this is a major limitation in FileIO.match's watermarking ability.
>>>> I believe there is a JIRA issue about this, but nobody has ever worked on
>>>> improving it.
>>>>
>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>>> and it assumes that the watermark during polling is always the current
>>>>> system time[1].
>>>>>
>>>>> Because of this the downstream watermark advancement is limited. When
>>>>> an element and restriction starts processing, the maximum you can hold the
>>>>> output watermark back by for this element and restriction pair is limited
>>>>> to the current input watermark (a common value to use is the current
>>>>> element's timestamp as the lower bound for all future output but if that
>>>>> element is late the output you produce may or may not be late (depends on
>>>>> downstream windowing strategy)). Holding this watermark back is important
>>>>> since many of these elements and restrictions could be processed in
>>>>> parallel at different rates.
>>>>>
>>>>> Based upon your implementation, you wouldn't need to control the
>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>>> to say what the watermark is after each polling round and allowed you to
>>>>> set the timestamp for each match found. This initial setting of the
>>>>> watermark during polling would be properly handled by the runner to block
>>>>> watermark advancement for those elements.
>>>>>
>>>>> Minor comments not related to your issue but would improve your
>>>>> implementation:
>>>>> 1) Typically you set the watermark right before returning. You are
>>>>> missing this from the failed tryClaim loop return.
>>>>> 2) You should structure your loop not based upon the end of the
>>>>> current restriction but continue processing till tryClaim fails. For
>>>>> example:
>>>>>       @ProcessElement
>>>>>       public void processElement(@Element String fileName,
>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>> outputReceiver) throws IOException {
>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>> tracker.currentRestriction().getFrom());
>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>         }
>>>>>       }
>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>> possible data issue since the code either parsed some filename incorrectly.
>>>>> It is likely that you copied this from Beam code and it is used there
>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>
>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>
>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made some
>>>>>> progress.
>>>>>>
>>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>>> splittable dofn to control event times and watermarks. The pipeline expects
>>>>>> files with the following name patterns:
>>>>>>
>>>>>> *yyyy-MM-dd*
>>>>>>
>>>>>> *yyyy-MM-dd.complete*
>>>>>>
>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>
>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to
>>>>>> the next day.
>>>>>>
>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>
>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>
>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>> below.
>>>>>>
>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>
>>>>>>   IntervalWindow firstWindow =
>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>   IntervalWindow secondWindow =
>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>
>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>           .continuously(
>>>>>>               Duration.millis(100),
>>>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>
>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>
>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>
>>>>>>   Thread writer =
>>>>>>       new Thread(
>>>>>>           () -> {
>>>>>>             try {
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>
>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>               throw new RuntimeException(e);
>>>>>>             }
>>>>>>           });
>>>>>>   writer.start();
>>>>>>
>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>   PAssert.that(output)
>>>>>>       .inWindow(firstWindow)
>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>>>
>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>   PAssert.that(output)
>>>>>>       .inWindow(secondWindow)
>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>>>
>>>>>>   // THIS ASSERTION PASSES.
>>>>>>   PAssert.that(output)
>>>>>>       .containsInAnyOrder(
>>>>>>           KV.of("my-key", 1L),
>>>>>>           KV.of("my-key", 2L),
>>>>>>           KV.of("my-key", 3L),
>>>>>>           KV.of("my-key", 4L),
>>>>>>           KV.of("my-key", 5L),
>>>>>>           KV.of("my-key", 6L));
>>>>>>
>>>>>>   p.run();
>>>>>>
>>>>>>   writer.join();
>>>>>> }
>>>>>>
>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>>> Splittable DoFn.
>>>>>>
>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>>>   PCollection<Metadata> matches =
>>>>>>       pipeline.apply(
>>>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>> }
>>>>>>
>>>>>> /**
>>>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>>>  * Long per line, IOException is thrown.
>>>>>>  */
>>>>>> @DoFn.BoundedPerElement
>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>
>>>>>>   @ProcessElement
>>>>>>   public void processElement(
>>>>>>       ProcessContext c,
>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>       throws IOException {
>>>>>>     Metadata metadata = c.element();
>>>>>>     logger.info(
>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>         metadata,
>>>>>>         tracker.currentRestriction(),
>>>>>>         c.timestamp());
>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>       String line;
>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>           continue;
>>>>>>         }
>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>           return;
>>>>>>         }
>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>>>       }
>>>>>>     }
>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>>>   }
>>>>>>
>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>     if (index != -1) {
>>>>>>       // In the case it has a suffix, strip it.
>>>>>>       filename = filename.substring(0, index);
>>>>>>     }
>>>>>>     Instant timestamp =
>>>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>     if (index != -1) {
>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>     }
>>>>>>     return timestamp;
>>>>>>   }
>>>>>>
>>>>>>   @GetInitialRestriction
>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>>>     long lineCount;
>>>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>       lineCount = stream.count();
>>>>>>     }
>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>   }
>>>>>>
>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>>>     // state.
>>>>>>     return getTimestamp(filename);
>>>>>>   }
>>>>>>
>>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>     }
>>>>>>     return timestamp;
>>>>>>   }
>>>>>>
>>>>>>   @NewWatermarkEstimator
>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>> this topic.
>>>>>>>
>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>> watermark estimator that is used and for your use case you should hold the
>>>>>>> watermark to some computable value (e.g. the files are generated every hour
>>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>>> watermark to the current hour).
>>>>>>>
>>>>>>> 1:
>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am looking into:
>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>>>> the file. For example:
>>>>>>>>
>>>>>>>> private static void run(String[] args) {
>>>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>
>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>       .apply(FileIO.match()
>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>>>   matches
>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>
>>>>>>>>   pipeline.run();
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>>   @ProcessElement
>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>>       String line;
>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>
>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>>> WithTimestamp transform.
>>>>>>>>
>>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>>> this use-case is currently supported in Beam.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>


-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Got it, thank you for the clarification.

I tried to run the pipeline locally, with the following main (see full
source code attached):

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  Pipeline pipeline = Pipeline.create(options);
  logger.info("running");

  PCollection<KV<String, Long>> output =
      FileProcessing.getData(
          pipeline, "/tmp/input/*", Duration.standardSeconds(1),
Growth.never());

  output
      .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
      .apply("LogWindowed", Log.ofElements("testGetData"))
      .apply(Sum.longsPerKey())
      .apply(
          "FormatResults",
          MapElements.into(TypeDescriptors.strings())
              .via((KV<String, Long> kv) -> String.format("{},{}",
kv.getKey(), kv.getValue())))
      .apply("LogResults", Log.ofElements("results"))
      .apply(
          TextIO.write()
              .to(Paths.get("/tmp/output/").resolve("Results").toString())
              .withWindowedWrites()
              .withNumShards(1));

  pipeline.run();
}


Then I am generating files using:

for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
/tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
2; done

I do not see any outputs being generated though. Can you elaborate why that
might be? I would suspect that once the watermark is set to day+1, the
results of the previous day should be finalized and hence the result for a
given window should be outputted.

On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <lc...@google.com> wrote:

> I think you should be using the largest "complete" timestamp from the
> metadata results and not be setting the watermark if you don't have one.
>
> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <pi...@gmail.com>
> wrote:
>
>> Thank you so much for the input, that was extremely helpful!
>>
>> I changed the pipeline from using FileIO.match() into using a custom
>> matching (very similar to the FileIO.match()) that looks as follows:
>>
>> static PCollection<KV<String, Long>> getData(
>>     Pipeline pipeline,
>>     String filepattern,
>>     Duration pollInterval,
>>     TerminationCondition terminationCondition) {
>>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>>       Watch.growthOf(
>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>>           .withPollInterval(pollInterval)
>>           .withTerminationPerInput(terminationCondition);
>>   return pipeline
>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>       .apply(Values.create())
>>       .apply(ParDo.of(new ReadFileFn()));
>> }
>>
>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>>
>>   @Override
>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>>       throws Exception {
>>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>>     // watermark.
>>     Instant instant = Instant.EPOCH;
>>     return Watch.Growth.PollResult.incomplete(
>>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>>         .withWatermark(instant);
>>   }
>> }
>>
>> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>>   @Override
>>   public String apply(MatchResult.Metadata input) {
>>     return input.resourceId().toString();
>>   }
>> }
>>
>> The above together with fixing the bugs that Luke pointed out (Thank you
>> Luke!), makes the unit test pass.
>>
>> Thank you again!
>>
>> If you have any feedback for the current code, I would appreciate it. I
>> am especially interested whether setting event time and watermark in
>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>
>>
>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>>
>>> FYI this is a major limitation in FileIO.match's watermarking ability. I
>>> believe there is a JIRA issue about this, but nobody has ever worked on
>>> improving it.
>>>
>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>> and it assumes that the watermark during polling is always the current
>>>> system time[1].
>>>>
>>>> Because of this the downstream watermark advancement is limited. When
>>>> an element and restriction starts processing, the maximum you can hold the
>>>> output watermark back by for this element and restriction pair is limited
>>>> to the current input watermark (a common value to use is the current
>>>> element's timestamp as the lower bound for all future output but if that
>>>> element is late the output you produce may or may not be late (depends on
>>>> downstream windowing strategy)). Holding this watermark back is important
>>>> since many of these elements and restrictions could be processed in
>>>> parallel at different rates.
>>>>
>>>> Based upon your implementation, you wouldn't need to control the
>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>> to say what the watermark is after each polling round and allowed you to
>>>> set the timestamp for each match found. This initial setting of the
>>>> watermark during polling would be properly handled by the runner to block
>>>> watermark advancement for those elements.
>>>>
>>>> Minor comments not related to your issue but would improve your
>>>> implementation:
>>>> 1) Typically you set the watermark right before returning. You are
>>>> missing this from the failed tryClaim loop return.
>>>> 2) You should structure your loop not based upon the end of the current
>>>> restriction but continue processing till tryClaim fails. For example:
>>>>       @ProcessElement
>>>>       public void processElement(@Element String fileName,
>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>> outputReceiver) throws IOException {
>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>         seekToNextRecordBoundaryInFile(file,
>>>> tracker.currentRestriction().getFrom());
>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>           outputReceiver.output(readNextRecord(file));
>>>>         }
>>>>       }
>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>> possible data issue since the code either parsed some filename incorrectly.
>>>> It is likely that you copied this from Beam code and it is used there
>>>> because user implementations of UnboundedSource were incorrectly setting
>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>
>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>> piotr.filipiuk@gmail.com> wrote:
>>>>
>>>>> Thank you for a quick response. I tried to follow the doc attached and
>>>>> read existing Beam code that uses the Splittable DoFns, and I made some
>>>>> progress.
>>>>>
>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>> splittable dofn to control event times and watermarks. The pipeline expects
>>>>> files with the following name patterns:
>>>>>
>>>>> *yyyy-MM-dd*
>>>>>
>>>>> *yyyy-MM-dd.complete*
>>>>>
>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>
>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to the
>>>>> next day.
>>>>>
>>>>> I am at the point when the following unit test fails the inWindow()
>>>>> assertions, the last assertion passes. It seems that even though I
>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>
>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>
>>>>> Here is a unit test. The function being tested is getData() defined
>>>>> below.
>>>>>
>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>
>>>>>   IntervalWindow firstWindow =
>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>>   logger.info("first window {}", firstWindow);
>>>>>   IntervalWindow secondWindow =
>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>>   logger.info("second window {}", secondWindow);
>>>>>
>>>>>   MatchConfiguration matchConfiguration =
>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>           .continuously(
>>>>>               Duration.millis(100),
>>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>
>>>>>   PCollection<KV<String, Long>> output =
>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>
>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>
>>>>>   Thread writer =
>>>>>       new Thread(
>>>>>           () -> {
>>>>>             try {
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>
>>>>>             } catch (IOException | InterruptedException e) {
>>>>>               throw new RuntimeException(e);
>>>>>             }
>>>>>           });
>>>>>   writer.start();
>>>>>
>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>   PAssert.that(output)
>>>>>       .inWindow(firstWindow)
>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>>
>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>   PAssert.that(output)
>>>>>       .inWindow(secondWindow)
>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>>
>>>>>   // THIS ASSERTION PASSES.
>>>>>   PAssert.that(output)
>>>>>       .containsInAnyOrder(
>>>>>           KV.of("my-key", 1L),
>>>>>           KV.of("my-key", 2L),
>>>>>           KV.of("my-key", 3L),
>>>>>           KV.of("my-key", 4L),
>>>>>           KV.of("my-key", 5L),
>>>>>           KV.of("my-key", 6L));
>>>>>
>>>>>   p.run();
>>>>>
>>>>>   writer.join();
>>>>> }
>>>>>
>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>> Splittable DoFn.
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>>   PCollection<Metadata> matches =
>>>>>       pipeline.apply(
>>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>> }
>>>>>
>>>>> /**
>>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>>  * Long per line, IOException is thrown.
>>>>>  */
>>>>> @DoFn.BoundedPerElement
>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>
>>>>>   @ProcessElement
>>>>>   public void processElement(
>>>>>       ProcessContext c,
>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>       throws IOException {
>>>>>     Metadata metadata = c.element();
>>>>>     logger.info(
>>>>>         "reading {} with restriction {} @ {}",
>>>>>         metadata,
>>>>>         tracker.currentRestriction(),
>>>>>         c.timestamp());
>>>>>     String filename = metadata.resourceId().toString();
>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>       String line;
>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>           continue;
>>>>>         }
>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>           return;
>>>>>         }
>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>>       }
>>>>>     }
>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>>   }
>>>>>
>>>>>   private Instant getTimestamp(String filepath) {
>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>     if (index != -1) {
>>>>>       // In the case it has a suffix, strip it.
>>>>>       filename = filename.substring(0, index);
>>>>>     }
>>>>>     Instant timestamp =
>>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>     if (index != -1) {
>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>     }
>>>>>     return timestamp;
>>>>>   }
>>>>>
>>>>>   @GetInitialRestriction
>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>>     long lineCount;
>>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>       lineCount = stream.count();
>>>>>     }
>>>>>     return new OffsetRange(0L, lineCount);
>>>>>   }
>>>>>
>>>>>   @GetInitialWatermarkEstimatorState
>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>     String filename = metadata.resourceId().toString();
>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>>     // state.
>>>>>     return getTimestamp(filename);
>>>>>   }
>>>>>
>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>     }
>>>>>     return timestamp;
>>>>>   }
>>>>>
>>>>>   @NewWatermarkEstimator
>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I'm working on a blog post[1] about splittable dofns that covers this
>>>>>> topic.
>>>>>>
>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>> watermark estimator that is used and for your use case you should hold the
>>>>>> watermark to some computable value (e.g. the files are generated every hour
>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>> watermark to the current hour).
>>>>>>
>>>>>> 1:
>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am looking into:
>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>>> the file. For example:
>>>>>>>
>>>>>>> private static void run(String[] args) {
>>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>
>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>       .apply(FileIO.match()
>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>>   matches
>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>
>>>>>>>   pipeline.run();
>>>>>>> }
>>>>>>>
>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>
>>>>>>>   @ProcessElement
>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>     Metadata metadata = c.element();
>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>>       String line;
>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>       }
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>
>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>> WithTimestamp transform.
>>>>>>>
>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>> this use-case is currently supported in Beam.
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Luke Cwik <lc...@google.com>.
I think you should be using the largest "complete" timestamp from the
metadata results and not be setting the watermark if you don't have one.

On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Thank you so much for the input, that was extremely helpful!
>
> I changed the pipeline from using FileIO.match() into using a custom
> matching (very similar to the FileIO.match()) that looks as follows:
>
> static PCollection<KV<String, Long>> getData(
>     Pipeline pipeline,
>     String filepattern,
>     Duration pollInterval,
>     TerminationCondition terminationCondition) {
>   final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
>       Watch.growthOf(
>               Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn())
>           .withPollInterval(pollInterval)
>           .withTerminationPerInput(terminationCondition);
>   return pipeline
>       .apply("Create filepattern", Create.<String>of(filepattern))
>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>       .apply(Values.create())
>       .apply(ParDo.of(new ReadFileFn()));
> }
>
> private static class MatchPollFn extends PollFn<String, Metadata> {
>   private static final Logger logger = LoggerFactory.getLogger(MatchPollFn.class);
>
>   @Override
>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
>       throws Exception {
>     // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
>     // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
>     // watermark.
>     Instant instant = Instant.EPOCH;
>     return Watch.Growth.PollResult.incomplete(
>             instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())
>         .withWatermark(instant);
>   }
> }
>
> private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
>   @Override
>   public String apply(MatchResult.Metadata input) {
>     return input.resourceId().toString();
>   }
> }
>
> The above together with fixing the bugs that Luke pointed out (Thank you
> Luke!), makes the unit test pass.
>
> Thank you again!
>
> If you have any feedback for the current code, I would appreciate it. I am
> especially interested whether setting event time and watermark in
> *MatchPollFn* to *EPOCH* is a correct way to go.
>
>
> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:
>
>> FYI this is a major limitation in FileIO.match's watermarking ability. I
>> believe there is a JIRA issue about this, but nobody has ever worked on
>> improving it.
>>
>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> FileIO.match doesn't allow one to configure how the watermark advances
>>> and it assumes that the watermark during polling is always the current
>>> system time[1].
>>>
>>> Because of this the downstream watermark advancement is limited. When an
>>> element and restriction starts processing, the maximum you can hold the
>>> output watermark back by for this element and restriction pair is limited
>>> to the current input watermark (a common value to use is the current
>>> element's timestamp as the lower bound for all future output but if that
>>> element is late the output you produce may or may not be late (depends on
>>> downstream windowing strategy)). Holding this watermark back is important
>>> since many of these elements and restrictions could be processed in
>>> parallel at different rates.
>>>
>>> Based upon your implementation, you wouldn't need to control the
>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>> to say what the watermark is after each polling round and allowed you to
>>> set the timestamp for each match found. This initial setting of the
>>> watermark during polling would be properly handled by the runner to block
>>> watermark advancement for those elements.
>>>
>>> Minor comments not related to your issue but would improve your
>>> implementation:
>>> 1) Typically you set the watermark right before returning. You are
>>> missing this from the failed tryClaim loop return.
>>> 2) You should structure your loop not based upon the end of the current
>>> restriction but continue processing till tryClaim fails. For example:
>>>       @ProcessElement
>>>       public void processElement(@Element String fileName,
>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>> outputReceiver) throws IOException {
>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>         seekToNextRecordBoundaryInFile(file,
>>> tracker.currentRestriction().getFrom());
>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>           outputReceiver.output(readNextRecord(file));
>>>         }
>>>       }
>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a possible
>>> data issue since the code either parsed some filename incorrectly. It is
>>> likely that you copied this from Beam code and it is used there because
>>> user implementations of UnboundedSource were incorrectly setting the
>>> watermark outside of the bounds and there is no way to fix them.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>
>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <pi...@gmail.com>
>>> wrote:
>>>
>>>> Thank you for a quick response. I tried to follow the doc attached and
>>>> read existing Beam code that uses the Splittable DoFns, and I made some
>>>> progress.
>>>>
>>>> I created a simple pipeline that matches given filepattern, and uses
>>>> splittable dofn to control event times and watermarks. The pipeline expects
>>>> files with the following name patterns:
>>>>
>>>> *yyyy-MM-dd*
>>>>
>>>> *yyyy-MM-dd.complete*
>>>>
>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>
>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp is *yyyy-MM-ddT00:00:00.000Z
>>>> plus one day* - hence it advances to the next day.
>>>>
>>>> I am at the point when the following unit test fails the inWindow()
>>>> assertions, the last assertion passes. It seems that even though I
>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>
>>>> I would appreciate help/suggestions on what I am missing.
>>>>
>>>> Here is a unit test. The function being tested is getData() defined
>>>> below.
>>>>
>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>   final Duration duration = Duration.standardDays(1);
>>>>
>>>>   IntervalWindow firstWindow =
>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>>   logger.info("first window {}", firstWindow);
>>>>   IntervalWindow secondWindow =
>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>>   logger.info("second window {}", secondWindow);
>>>>
>>>>   MatchConfiguration matchConfiguration =
>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>           .continuously(
>>>>               Duration.millis(100),
>>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>
>>>>   PCollection<KV<String, Long>> output =
>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>
>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>
>>>>   Thread writer =
>>>>       new Thread(
>>>>           () -> {
>>>>             try {
>>>>               Thread.sleep(1000);
>>>>
>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>
>>>>               Thread.sleep(1000);
>>>>
>>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>
>>>>               Thread.sleep(1000);
>>>>
>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>
>>>>               Thread.sleep(1000);
>>>>
>>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>
>>>>             } catch (IOException | InterruptedException e) {
>>>>               throw new RuntimeException(e);
>>>>             }
>>>>           });
>>>>   writer.start();
>>>>
>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>   PAssert.that(output)
>>>>       .inWindow(firstWindow)
>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>>
>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>   PAssert.that(output)
>>>>       .inWindow(secondWindow)
>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>>
>>>>   // THIS ASSERTION PASSES.
>>>>   PAssert.that(output)
>>>>       .containsInAnyOrder(
>>>>           KV.of("my-key", 1L),
>>>>           KV.of("my-key", 2L),
>>>>           KV.of("my-key", 3L),
>>>>           KV.of("my-key", 4L),
>>>>           KV.of("my-key", 5L),
>>>>           KV.of("my-key", 6L));
>>>>
>>>>   p.run();
>>>>
>>>>   writer.join();
>>>> }
>>>>
>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>> Splittable DoFn.
>>>>
>>>> static PCollection<KV<String, Long>> getData(
>>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>>   PCollection<Metadata> matches =
>>>>       pipeline.apply(
>>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>> }
>>>>
>>>> /**
>>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>>  * Long per line, IOException is thrown.
>>>>  */
>>>> @DoFn.BoundedPerElement
>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>
>>>>   @ProcessElement
>>>>   public void processElement(
>>>>       ProcessContext c,
>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>       throws IOException {
>>>>     Metadata metadata = c.element();
>>>>     logger.info(
>>>>         "reading {} with restriction {} @ {}",
>>>>         metadata,
>>>>         tracker.currentRestriction(),
>>>>         c.timestamp());
>>>>     String filename = metadata.resourceId().toString();
>>>>     Instant timestamp = getTimestamp(filename);
>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>       String line;
>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>           continue;
>>>>         }
>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>           logger.info("failed to claim {}", lineNumber);
>>>>           return;
>>>>         }
>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>>       }
>>>>     }
>>>>     logger.info("setting watermark to {}", timestamp);
>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>>   }
>>>>
>>>>   private Instant getTimestamp(String filepath) {
>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>     int index = filename.lastIndexOf(".complete");
>>>>     if (index != -1) {
>>>>       // In the case it has a suffix, strip it.
>>>>       filename = filename.substring(0, index);
>>>>     }
>>>>     Instant timestamp =
>>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>     if (index != -1) {
>>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>     }
>>>>     return timestamp;
>>>>   }
>>>>
>>>>   @GetInitialRestriction
>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>>     long lineCount;
>>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>       lineCount = stream.count();
>>>>     }
>>>>     return new OffsetRange(0L, lineCount);
>>>>   }
>>>>
>>>>   @GetInitialWatermarkEstimatorState
>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>     String filename = metadata.resourceId().toString();
>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>>     // state.
>>>>     return getTimestamp(filename);
>>>>   }
>>>>
>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>     }
>>>>     return timestamp;
>>>>   }
>>>>
>>>>   @NewWatermarkEstimator
>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I'm working on a blog post[1] about splittable dofns that covers this
>>>>> topic.
>>>>>
>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>> watermark estimator that is used and for your use case you should hold the
>>>>> watermark to some computable value (e.g. the files are generated every hour
>>>>> so once you know the last file has appeared for that hour you advance the
>>>>> watermark to the current hour).
>>>>>
>>>>> 1:
>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>
>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>> piotr.filipiuk@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am looking into:
>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>> and assigns Event Times based on e.g. file metadata or actual data inside
>>>>>> the file. For example:
>>>>>>
>>>>>> private static void run(String[] args) {
>>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>
>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>       .apply(FileIO.match()
>>>>>>           .filepattern("/tmp/input/*")
>>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>>   matches
>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>
>>>>>>   pipeline.run();
>>>>>> }
>>>>>>
>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>
>>>>>>   @ProcessElement
>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>     Metadata metadata = c.element();
>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>>       String line;
>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>       }
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>
>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>
>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>> WithTimestamp transform.
>>>>>>
>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>> this use-case is currently supported in Beam.
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>
> --
> Best regards,
> Piotr
>

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Thank you so much for the input, that was extremely helpful!

I changed the pipeline from using FileIO.match() into using a custom
matching (very similar to the FileIO.match()) that looks as follows:

static PCollection<KV<String, Long>> getData(
    Pipeline pipeline,
    String filepattern,
    Duration pollInterval,
    TerminationCondition terminationCondition) {
  final Growth<String, MatchResult.Metadata, String>
stringMetadataStringGrowth =
      Watch.growthOf(
              Contextful.of(new MatchPollFn(), Requirements.empty()),
new ExtractFilenameFn())
          .withPollInterval(pollInterval)
          .withTerminationPerInput(terminationCondition);
  return pipeline
      .apply("Create filepattern", Create.<String>of(filepattern))
      .apply("Continuously match filepatterns", stringMetadataStringGrowth)
      .apply(Values.create())
      .apply(ParDo.of(new ReadFileFn()));
}

private static class MatchPollFn extends PollFn<String, Metadata> {
  private static final Logger logger =
LoggerFactory.getLogger(MatchPollFn.class);

  @Override
  public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
element, Context c)
      throws Exception {
    // Here we only have the filepattern i.e. element, and hence we do
not know what the timestamp
    // and/or watermark should be. As a result, we output EPOCH as
both the timestamp and the
    // watermark.
    Instant instant = Instant.EPOCH;
    return Watch.Growth.PollResult.incomplete(
            instant, FileSystems.match(element,
EmptyMatchTreatment.ALLOW).metadata())
        .withWatermark(instant);
  }
}

private static class ExtractFilenameFn implements
SerializableFunction<Metadata, String> {
  @Override
  public String apply(MatchResult.Metadata input) {
    return input.resourceId().toString();
  }
}

The above together with fixing the bugs that Luke pointed out (Thank you
Luke!), makes the unit test pass.

Thank you again!

If you have any feedback for the current code, I would appreciate it. I am
especially interested whether setting event time and watermark in
*MatchPollFn* to *EPOCH* is a correct way to go.


On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <re...@google.com> wrote:

> FYI this is a major limitation in FileIO.match's watermarking ability. I
> believe there is a JIRA issue about this, but nobody has ever worked on
> improving it.
>
> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:
>
>> FileIO.match doesn't allow one to configure how the watermark advances
>> and it assumes that the watermark during polling is always the current
>> system time[1].
>>
>> Because of this the downstream watermark advancement is limited. When an
>> element and restriction starts processing, the maximum you can hold the
>> output watermark back by for this element and restriction pair is limited
>> to the current input watermark (a common value to use is the current
>> element's timestamp as the lower bound for all future output but if that
>> element is late the output you produce may or may not be late (depends on
>> downstream windowing strategy)). Holding this watermark back is important
>> since many of these elements and restrictions could be processed in
>> parallel at different rates.
>>
>> Based upon your implementation, you wouldn't need to control the
>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>> to say what the watermark is after each polling round and allowed you to
>> set the timestamp for each match found. This initial setting of the
>> watermark during polling would be properly handled by the runner to block
>> watermark advancement for those elements.
>>
>> Minor comments not related to your issue but would improve your
>> implementation:
>> 1) Typically you set the watermark right before returning. You are
>> missing this from the failed tryClaim loop return.
>> 2) You should structure your loop not based upon the end of the current
>> restriction but continue processing till tryClaim fails. For example:
>>       @ProcessElement
>>       public void processElement(@Element String fileName,
>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>> outputReceiver) throws IOException {
>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>         seekToNextRecordBoundaryInFile(file,
>> tracker.currentRestriction().getFrom());
>>         while (tracker.tryClaim(file.getFilePointer())) {
>>           outputReceiver.output(readNextRecord(file));
>>         }
>>       }
>> 3) ensureTimestampWithinBounds is dangerous as you're masking a possible
>> data issue since the code either parsed some filename incorrectly. It is
>> likely that you copied this from Beam code and it is used there because
>> user implementations of UnboundedSource were incorrectly setting the
>> watermark outside of the bounds and there is no way to fix them.
>>
>> 1:
>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>
>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <pi...@gmail.com>
>> wrote:
>>
>>> Thank you for a quick response. I tried to follow the doc attached and
>>> read existing Beam code that uses the Splittable DoFns, and I made some
>>> progress.
>>>
>>> I created a simple pipeline that matches given filepattern, and uses
>>> splittable dofn to control event times and watermarks. The pipeline expects
>>> files with the following name patterns:
>>>
>>> *yyyy-MM-dd*
>>>
>>> *yyyy-MM-dd.complete*
>>>
>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*. In
>>> both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>
>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp is *yyyy-MM-ddT00:00:00.000Z
>>> plus one day* - hence it advances to the next day.
>>>
>>> I am at the point when the following unit test fails the inWindow()
>>> assertions, the last assertion passes. It seems that even though I
>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>
>>> I would appreciate help/suggestions on what I am missing.
>>>
>>> Here is a unit test. The function being tested is getData() defined
>>> below.
>>>
>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>   final Duration duration = Duration.standardDays(1);
>>>
>>>   IntervalWindow firstWindow =
>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>>   logger.info("first window {}", firstWindow);
>>>   IntervalWindow secondWindow =
>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>>   logger.info("second window {}", secondWindow);
>>>
>>>   MatchConfiguration matchConfiguration =
>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>           .continuously(
>>>               Duration.millis(100),
>>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>
>>>   PCollection<KV<String, Long>> output =
>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>
>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>
>>>   Thread writer =
>>>       new Thread(
>>>           () -> {
>>>             try {
>>>               Thread.sleep(1000);
>>>
>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>
>>>               Thread.sleep(1000);
>>>
>>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>>               Files.write(firstPathComplete, Arrays.asList());
>>>
>>>               Thread.sleep(1000);
>>>
>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>
>>>               Thread.sleep(1000);
>>>
>>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>>               Files.write(secondPathComplete, Arrays.asList());
>>>
>>>             } catch (IOException | InterruptedException e) {
>>>               throw new RuntimeException(e);
>>>             }
>>>           });
>>>   writer.start();
>>>
>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>   PAssert.that(output)
>>>       .inWindow(firstWindow)
>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>>
>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>   PAssert.that(output)
>>>       .inWindow(secondWindow)
>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>>
>>>   // THIS ASSERTION PASSES.
>>>   PAssert.that(output)
>>>       .containsInAnyOrder(
>>>           KV.of("my-key", 1L),
>>>           KV.of("my-key", 2L),
>>>           KV.of("my-key", 3L),
>>>           KV.of("my-key", 4L),
>>>           KV.of("my-key", 5L),
>>>           KV.of("my-key", 6L));
>>>
>>>   p.run();
>>>
>>>   writer.join();
>>> }
>>>
>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>> filepattern. Then the file *Metadata* is processed by my custom
>>> Splittable DoFn.
>>>
>>> static PCollection<KV<String, Long>> getData(
>>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>>   PCollection<Metadata> matches =
>>>       pipeline.apply(
>>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>>> }
>>>
>>> /**
>>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>>  * Long per line, IOException is thrown.
>>>  */
>>> @DoFn.BoundedPerElement
>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>
>>>   @ProcessElement
>>>   public void processElement(
>>>       ProcessContext c,
>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>       throws IOException {
>>>     Metadata metadata = c.element();
>>>     logger.info(
>>>         "reading {} with restriction {} @ {}",
>>>         metadata,
>>>         tracker.currentRestriction(),
>>>         c.timestamp());
>>>     String filename = metadata.resourceId().toString();
>>>     Instant timestamp = getTimestamp(filename);
>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>       String line;
>>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>           continue;
>>>         }
>>>         if (!tracker.tryClaim(lineNumber)) {
>>>           logger.info("failed to claim {}", lineNumber);
>>>           return;
>>>         }
>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>>       }
>>>     }
>>>     logger.info("setting watermark to {}", timestamp);
>>>     watermarkEstimator.setWatermark(timestamp);
>>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>>   }
>>>
>>>   private Instant getTimestamp(String filepath) {
>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>     int index = filename.lastIndexOf(".complete");
>>>     if (index != -1) {
>>>       // In the case it has a suffix, strip it.
>>>       filename = filename.substring(0, index);
>>>     }
>>>     Instant timestamp =
>>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>     if (index != -1) {
>>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>>       return timestamp.plus(Duration.standardDays(1));
>>>     }
>>>     return timestamp;
>>>   }
>>>
>>>   @GetInitialRestriction
>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>>     long lineCount;
>>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>       lineCount = stream.count();
>>>     }
>>>     return new OffsetRange(0L, lineCount);
>>>   }
>>>
>>>   @GetInitialWatermarkEstimatorState
>>>   public Instant getInitialWatermarkEstimatorState(
>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>     String filename = metadata.resourceId().toString();
>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>>     // All subsequent processing of an element and restriction will be restored from the existing
>>>     // state.
>>>     return getTimestamp(filename);
>>>   }
>>>
>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>     }
>>>     return timestamp;
>>>   }
>>>
>>>   @NewWatermarkEstimator
>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>   }
>>> }
>>>
>>>
>>>
>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I'm working on a blog post[1] about splittable dofns that covers this
>>>> topic.
>>>>
>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>> watermark estimator that is used and for your use case you should hold the
>>>> watermark to some computable value (e.g. the files are generated every hour
>>>> so once you know the last file has appeared for that hour you advance the
>>>> watermark to the current hour).
>>>>
>>>> 1:
>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>
>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <pi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am looking into:
>>>>> https://beam.apache.org/documentation/patterns/file-processing/ since
>>>>> I would like to create a continuous pipeline that reads from files and
>>>>> assigns Event Times based on e.g. file metadata or actual data inside the
>>>>> file. For example:
>>>>>
>>>>> private static void run(String[] args) {
>>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>>   PCollection<Metadata> matches = pipeline
>>>>>       .apply(FileIO.match()
>>>>>           .filepattern("/tmp/input/*")
>>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>>   matches
>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>
>>>>>   pipeline.run();
>>>>> }
>>>>>
>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>>
>>>>>   @ProcessElement
>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>     Metadata metadata = c.element();
>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>     String filename = metadata.resourceId().toString();
>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>>       String line;
>>>>>       while ((line = br.readLine()) != null) {
>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>       }
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>>> Javadoc for details on changing the allowed skew.
>>>>>
>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>> the event time for the PCollection<Metadata>. I can see that the call
>>>>> to continuously() makes the PCollection unbounded and assigns default
>>>>> Event Time. Without the call to continuously() I can assign the timestamps
>>>>> without problems either via c.outputWithTimestamp or WithTimestamp
>>>>> transform.
>>>>>
>>>>> I would like to know what is the way to fix the issue, and whether
>>>>> this use-case is currently supported in Beam.
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>

-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Reuven Lax <re...@google.com>.
FYI this is a major limitation in FileIO.match's watermarking ability. I
believe there is a JIRA issue about this, but nobody has ever worked on
improving it.

On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <lc...@google.com> wrote:

> FileIO.match doesn't allow one to configure how the watermark advances and
> it assumes that the watermark during polling is always the current system
> time[1].
>
> Because of this the downstream watermark advancement is limited. When an
> element and restriction starts processing, the maximum you can hold the
> output watermark back by for this element and restriction pair is limited
> to the current input watermark (a common value to use is the current
> element's timestamp as the lower bound for all future output but if that
> element is late the output you produce may or may not be late (depends on
> downstream windowing strategy)). Holding this watermark back is important
> since many of these elements and restrictions could be processed in
> parallel at different rates.
>
> Based upon your implementation, you wouldn't need to control the watermark
> from the file reading splittable DoFn if FileIO.match allowed you to say
> what the watermark is after each polling round and allowed you to set the
> timestamp for each match found. This initial setting of the watermark
> during polling would be properly handled by the runner to block watermark
> advancement for those elements.
>
> Minor comments not related to your issue but would improve your
> implementation:
> 1) Typically you set the watermark right before returning. You are missing
> this from the failed tryClaim loop return.
> 2) You should structure your loop not based upon the end of the current
> restriction but continue processing till tryClaim fails. For example:
>       @ProcessElement
>       public void processElement(@Element String fileName,
> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
> outputReceiver) throws IOException {
>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>         seekToNextRecordBoundaryInFile(file,
> tracker.currentRestriction().getFrom());
>         while (tracker.tryClaim(file.getFilePointer())) {
>           outputReceiver.output(readNextRecord(file));
>         }
>       }
> 3) ensureTimestampWithinBounds is dangerous as you're masking a possible
> data issue since the code either parsed some filename incorrectly. It is
> likely that you copied this from Beam code and it is used there because
> user implementations of UnboundedSource were incorrectly setting the
> watermark outside of the bounds and there is no way to fix them.
>
> 1:
> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>
> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <pi...@gmail.com>
> wrote:
>
>> Thank you for a quick response. I tried to follow the doc attached and
>> read existing Beam code that uses the Splittable DoFns, and I made some
>> progress.
>>
>> I created a simple pipeline that matches given filepattern, and uses
>> splittable dofn to control event times and watermarks. The pipeline expects
>> files with the following name patterns:
>>
>> *yyyy-MM-dd*
>>
>> *yyyy-MM-dd.complete*
>>
>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines
>> of the file using *outputWithTimestamp(..., timestamp)*. Additionally,
>> it calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
>> timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>
>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp is *yyyy-MM-ddT00:00:00.000Z
>> plus one day* - hence it advances to the next day.
>>
>> I am at the point when the following unit test fails the inWindow()
>> assertions, the last assertion passes. It seems that even though I
>> call watermarkEstimator.setWatermark() the window is not being closed.
>>
>> I would appreciate help/suggestions on what I am missing.
>>
>> Here is a unit test. The function being tested is getData() defined below.
>>
>> public void testGetDataWithNewFiles() throws InterruptedException {
>>   final Duration duration = Duration.standardDays(1);
>>
>>   IntervalWindow firstWindow =
>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>>   logger.info("first window {}", firstWindow);
>>   IntervalWindow secondWindow =
>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>>   logger.info("second window {}", secondWindow);
>>
>>   MatchConfiguration matchConfiguration =
>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>           .continuously(
>>               Duration.millis(100),
>>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>
>>   PCollection<KV<String, Long>> output =
>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>
>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>
>>   Thread writer =
>>       new Thread(
>>           () -> {
>>             try {
>>               Thread.sleep(1000);
>>
>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>
>>               Thread.sleep(1000);
>>
>>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>>               Files.write(firstPathComplete, Arrays.asList());
>>
>>               Thread.sleep(1000);
>>
>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>
>>               Thread.sleep(1000);
>>
>>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>>               Files.write(secondPathComplete, Arrays.asList());
>>
>>             } catch (IOException | InterruptedException e) {
>>               throw new RuntimeException(e);
>>             }
>>           });
>>   writer.start();
>>
>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>   PAssert.that(output)
>>       .inWindow(firstWindow)
>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>>
>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>   PAssert.that(output)
>>       .inWindow(secondWindow)
>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>>
>>   // THIS ASSERTION PASSES.
>>   PAssert.that(output)
>>       .containsInAnyOrder(
>>           KV.of("my-key", 1L),
>>           KV.of("my-key", 2L),
>>           KV.of("my-key", 3L),
>>           KV.of("my-key", 4L),
>>           KV.of("my-key", 5L),
>>           KV.of("my-key", 6L));
>>
>>   p.run();
>>
>>   writer.join();
>> }
>>
>> Here is the code. Essentially, I am using *FileIO.match()* to match
>> filepattern. Then the file *Metadata* is processed by my custom
>> Splittable DoFn.
>>
>> static PCollection<KV<String, Long>> getData(
>>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>>   PCollection<Metadata> matches =
>>       pipeline.apply(
>>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
>> }
>>
>> /**
>>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>>  * Long per line, IOException is thrown.
>>  */
>> @DoFn.BoundedPerElement
>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>
>>   @ProcessElement
>>   public void processElement(
>>       ProcessContext c,
>>       RestrictionTracker<OffsetRange, Long> tracker,
>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>       throws IOException {
>>     Metadata metadata = c.element();
>>     logger.info(
>>         "reading {} with restriction {} @ {}",
>>         metadata,
>>         tracker.currentRestriction(),
>>         c.timestamp());
>>     String filename = metadata.resourceId().toString();
>>     Instant timestamp = getTimestamp(filename);
>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>       String line;
>>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>           continue;
>>         }
>>         if (!tracker.tryClaim(lineNumber)) {
>>           logger.info("failed to claim {}", lineNumber);
>>           return;
>>         }
>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>>       }
>>     }
>>     logger.info("setting watermark to {}", timestamp);
>>     watermarkEstimator.setWatermark(timestamp);
>>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>>   }
>>
>>   private Instant getTimestamp(String filepath) {
>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>     String filename = Paths.get(filepath).getFileName().toString();
>>     int index = filename.lastIndexOf(".complete");
>>     if (index != -1) {
>>       // In the case it has a suffix, strip it.
>>       filename = filename.substring(0, index);
>>     }
>>     Instant timestamp =
>>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>     if (index != -1) {
>>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>>       return timestamp.plus(Duration.standardDays(1));
>>     }
>>     return timestamp;
>>   }
>>
>>   @GetInitialRestriction
>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>>     long lineCount;
>>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>       lineCount = stream.count();
>>     }
>>     return new OffsetRange(0L, lineCount);
>>   }
>>
>>   @GetInitialWatermarkEstimatorState
>>   public Instant getInitialWatermarkEstimatorState(
>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>     String filename = metadata.resourceId().toString();
>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>     // Compute and return the initial watermark estimator state for each element and restriction.
>>     // All subsequent processing of an element and restriction will be restored from the existing
>>     // state.
>>     return getTimestamp(filename);
>>   }
>>
>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>     }
>>     return timestamp;
>>   }
>>
>>   @NewWatermarkEstimator
>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>   }
>> }
>>
>>
>>
>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I'm working on a blog post[1] about splittable dofns that covers this
>>> topic.
>>>
>>> The TLDR; is that FileIO.match() should allow users to control the
>>> watermark estimator that is used and for your use case you should hold the
>>> watermark to some computable value (e.g. the files are generated every hour
>>> so once you know the last file has appeared for that hour you advance the
>>> watermark to the current hour).
>>>
>>> 1:
>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>
>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <pi...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am looking into:
>>>> https://beam.apache.org/documentation/patterns/file-processing/ since
>>>> I would like to create a continuous pipeline that reads from files and
>>>> assigns Event Times based on e.g. file metadata or actual data inside the
>>>> file. For example:
>>>>
>>>> private static void run(String[] args) {
>>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>   PCollection<Metadata> matches = pipeline
>>>>       .apply(FileIO.match()
>>>>           .filepattern("/tmp/input/*")
>>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>>   matches
>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>
>>>>   pipeline.run();
>>>> }
>>>>
>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>>
>>>>   @ProcessElement
>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>     Metadata metadata = c.element();
>>>>     // I believe c.timestamp() is based on processing time.
>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>     String filename = metadata.resourceId().toString();
>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>     logger.info("lastModified @ {}", timestamp);
>>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>>       String line;
>>>>       while ((line = br.readLine()) != null) {
>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>       }
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>>> Javadoc for details on changing the allowed skew.
>>>>
>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>> the event time for the PCollection<Metadata>. I can see that the call
>>>> to continuously() makes the PCollection unbounded and assigns default
>>>> Event Time. Without the call to continuously() I can assign the timestamps
>>>> without problems either via c.outputWithTimestamp or WithTimestamp
>>>> transform.
>>>>
>>>> I would like to know what is the way to fix the issue, and whether this
>>>> use-case is currently supported in Beam.
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

Re: Processing files as they arrive with custom timestamps

Posted by Luke Cwik <lc...@google.com>.
FileIO.match doesn't allow one to configure how the watermark advances and
it assumes that the watermark during polling is always the current system
time[1].

Because of this the downstream watermark advancement is limited. When an
element and restriction starts processing, the maximum you can hold the
output watermark back by for this element and restriction pair is limited
to the current input watermark (a common value to use is the current
element's timestamp as the lower bound for all future output but if that
element is late the output you produce may or may not be late (depends on
downstream windowing strategy)). Holding this watermark back is important
since many of these elements and restrictions could be processed in
parallel at different rates.

Based upon your implementation, you wouldn't need to control the watermark
from the file reading splittable DoFn if FileIO.match allowed you to say
what the watermark is after each polling round and allowed you to set the
timestamp for each match found. This initial setting of the watermark
during polling would be properly handled by the runner to block watermark
advancement for those elements.

Minor comments not related to your issue but would improve your
implementation:
1) Typically you set the watermark right before returning. You are missing
this from the failed tryClaim loop return.
2) You should structure your loop not based upon the end of the current
restriction but continue processing till tryClaim fails. For example:
      @ProcessElement
      public void processElement(@Element String fileName,
RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
outputReceiver) throws IOException {
        RandomAccessFile file = new RandomAccessFile(fileName, "r");
        seekToNextRecordBoundaryInFile(file,
tracker.currentRestriction().getFrom());
        while (tracker.tryClaim(file.getFilePointer())) {
          outputReceiver.output(readNextRecord(file));
        }
      }
3) ensureTimestampWithinBounds is dangerous as you're masking a possible
data issue since the code either parsed some filename incorrectly. It is
likely that you copied this from Beam code and it is used there because
user implementations of UnboundedSource were incorrectly setting the
watermark outside of the bounds and there is no way to fix them.

1:
https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666

On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Thank you for a quick response. I tried to follow the doc attached and
> read existing Beam code that uses the Splittable DoFns, and I made some
> progress.
>
> I created a simple pipeline that matches given filepattern, and uses
> splittable dofn to control event times and watermarks. The pipeline expects
> files with the following name patterns:
>
> *yyyy-MM-dd*
>
> *yyyy-MM-dd.complete*
>
> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines
> of the file using *outputWithTimestamp(..., timestamp)*. Additionally, it
> calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
> timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>
> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
> *watermarkEstimator.setWatermark(timestamp)*, where timestamp is *yyyy-MM-ddT00:00:00.000Z
> plus one day* - hence it advances to the next day.
>
> I am at the point when the following unit test fails the inWindow()
> assertions, the last assertion passes. It seems that even though I
> call watermarkEstimator.setWatermark() the window is not being closed.
>
> I would appreciate help/suggestions on what I am missing.
>
> Here is a unit test. The function being tested is getData() defined below.
>
> public void testGetDataWithNewFiles() throws InterruptedException {
>   final Duration duration = Duration.standardDays(1);
>
>   IntervalWindow firstWindow =
>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>   logger.info("first window {}", firstWindow);
>   IntervalWindow secondWindow =
>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>   logger.info("second window {}", secondWindow);
>
>   MatchConfiguration matchConfiguration =
>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>           .continuously(
>               Duration.millis(100),
>               Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>
>   PCollection<KV<String, Long>> output =
>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", matchConfiguration)
>           .apply("Window", Window.into(FixedWindows.of(duration)))
>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>
>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>
>   Thread writer =
>       new Thread(
>           () -> {
>             try {
>               Thread.sleep(1000);
>
>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>
>               Thread.sleep(1000);
>
>               Path firstPathComplete = tmpFolder.newFile("2020-01-01.complete").toPath();
>               Files.write(firstPathComplete, Arrays.asList());
>
>               Thread.sleep(1000);
>
>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>
>               Thread.sleep(1000);
>
>               Path secondPathComplete = tmpFolder.newFile("2020-01-02.complete").toPath();
>               Files.write(secondPathComplete, Arrays.asList());
>
>             } catch (IOException | InterruptedException e) {
>               throw new RuntimeException(e);
>             }
>           });
>   writer.start();
>
>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>   PAssert.that(output)
>       .inWindow(firstWindow)
>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), KV.of("my-key", 3L));
>
>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>   PAssert.that(output)
>       .inWindow(secondWindow)
>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), KV.of("my-key", 6L));
>
>   // THIS ASSERTION PASSES.
>   PAssert.that(output)
>       .containsInAnyOrder(
>           KV.of("my-key", 1L),
>           KV.of("my-key", 2L),
>           KV.of("my-key", 3L),
>           KV.of("my-key", 4L),
>           KV.of("my-key", 5L),
>           KV.of("my-key", 6L));
>
>   p.run();
>
>   writer.join();
> }
>
> Here is the code. Essentially, I am using *FileIO.match()* to match
> filepattern. Then the file *Metadata* is processed by my custom
> Splittable DoFn.
>
> static PCollection<KV<String, Long>> getData(
>     Pipeline pipeline, String filepattern, MatchConfiguration matchConfiguration) {
>   PCollection<Metadata> matches =
>       pipeline.apply(
>           FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get Data"));
> }
>
> /**
>  * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
>  * are Long values corresponding to the lines in the file. In the case file does not contain one
>  * Long per line, IOException is thrown.
>  */
> @DoFn.BoundedPerElement
> private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>
>   @ProcessElement
>   public void processElement(
>       ProcessContext c,
>       RestrictionTracker<OffsetRange, Long> tracker,
>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>       throws IOException {
>     Metadata metadata = c.element();
>     logger.info(
>         "reading {} with restriction {} @ {}",
>         metadata,
>         tracker.currentRestriction(),
>         c.timestamp());
>     String filename = metadata.resourceId().toString();
>     Instant timestamp = getTimestamp(filename);
>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>       String line;
>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
>         if (lineNumber < tracker.currentRestriction().getFrom()
>             || lineNumber >= tracker.currentRestriction().getTo()) {
>           continue;
>         }
>         if (!tracker.tryClaim(lineNumber)) {
>           logger.info("failed to claim {}", lineNumber);
>           return;
>         }
>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
>       }
>     }
>     logger.info("setting watermark to {}", timestamp);
>     watermarkEstimator.setWatermark(timestamp);
>     logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
>   }
>
>   private Instant getTimestamp(String filepath) {
>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>     String filename = Paths.get(filepath).getFileName().toString();
>     int index = filename.lastIndexOf(".complete");
>     if (index != -1) {
>       // In the case it has a suffix, strip it.
>       filename = filename.substring(0, index);
>     }
>     Instant timestamp =
>         Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>     if (index != -1) {
>       // In the case it has a suffix i.e. it is complete, fast forward to the next day.
>       return timestamp.plus(Duration.standardDays(1));
>     }
>     return timestamp;
>   }
>
>   @GetInitialRestriction
>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
>     long lineCount;
>     try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
>       lineCount = stream.count();
>     }
>     return new OffsetRange(0L, lineCount);
>   }
>
>   @GetInitialWatermarkEstimatorState
>   public Instant getInitialWatermarkEstimatorState(
>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>     String filename = metadata.resourceId().toString();
>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>     // Compute and return the initial watermark estimator state for each element and restriction.
>     // All subsequent processing of an element and restriction will be restored from the existing
>     // state.
>     return getTimestamp(filename);
>   }
>
>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>     }
>     return timestamp;
>   }
>
>   @NewWatermarkEstimator
>   public WatermarkEstimators.Manual newWatermarkEstimator(
>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>     return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>   }
> }
>
>
>
> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:
>
>> I'm working on a blog post[1] about splittable dofns that covers this
>> topic.
>>
>> The TLDR; is that FileIO.match() should allow users to control the
>> watermark estimator that is used and for your use case you should hold the
>> watermark to some computable value (e.g. the files are generated every hour
>> so once you know the last file has appeared for that hour you advance the
>> watermark to the current hour).
>>
>> 1:
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>
>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <pi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am looking into:
>>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>>> would like to create a continuous pipeline that reads from files and
>>> assigns Event Times based on e.g. file metadata or actual data inside the
>>> file. For example:
>>>
>>> private static void run(String[] args) {
>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>   Pipeline pipeline = Pipeline.create(options);
>>>
>>>   PCollection<Metadata> matches = pipeline
>>>       .apply(FileIO.match()
>>>           .filepattern("/tmp/input/*")
>>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>>   matches
>>>       .apply(ParDo.of(new ReadFileFn()))
>>>
>>>   pipeline.run();
>>> }
>>>
>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>>
>>>   @ProcessElement
>>>   public void processElement(ProcessContext c) throws IOException {
>>>     Metadata metadata = c.element();
>>>     // I believe c.timestamp() is based on processing time.
>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>     String filename = metadata.resourceId().toString();
>>>     // Output timestamps must be no earlier than the timestamp of the
>>>     // current input minus the allowed skew (0 milliseconds).
>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>     logger.info("lastModified @ {}", timestamp);
>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>       String line;
>>>       while ((line = br.readLine()) != null) {
>>>         c.outputWithTimestamp(line, c.timestamp());
>>>       }
>>>     }
>>>   }
>>> }
>>>
>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>> Javadoc for details on changing the allowed skew.
>>>
>>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>>> event time for the PCollection<Metadata>. I can see that the call to
>>> continuously() makes the PCollection unbounded and assigns default
>>> Event Time. Without the call to continuously() I can assign the timestamps
>>> without problems either via c.outputWithTimestamp or WithTimestamp
>>> transform.
>>>
>>> I would like to know what is the way to fix the issue, and whether this
>>> use-case is currently supported in Beam.
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>

Re: Processing files as they arrive with custom timestamps

Posted by Piotr Filipiuk <pi...@gmail.com>.
Thank you for a quick response. I tried to follow the doc attached and read
existing Beam code that uses the Splittable DoFns, and I made some progress.

I created a simple pipeline that matches given filepattern, and uses
splittable dofn to control event times and watermarks. The pipeline expects
files with the following name patterns:

*yyyy-MM-dd*

*yyyy-MM-dd.complete*

Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines of
the file using *outputWithTimestamp(..., timestamp)*. Additionally, it
calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
timestamp is *yyyy-MM-ddT00:00:00.000Z*.

Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
*watermarkEstimator.setWatermark(timestamp)*, where timestamp is
*yyyy-MM-ddT00:00:00.000Z
plus one day* - hence it advances to the next day.

I am at the point when the following unit test fails the inWindow()
assertions, the last assertion passes. It seems that even though I
call watermarkEstimator.setWatermark() the window is not being closed.

I would appreciate help/suggestions on what I am missing.

Here is a unit test. The function being tested is getData() defined below.

public void testGetDataWithNewFiles() throws InterruptedException {
  final Duration duration = Duration.standardDays(1);

  IntervalWindow firstWindow =
      new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
  logger.info("first window {}", firstWindow);
  IntervalWindow secondWindow =
      new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
  logger.info("second window {}", secondWindow);

  MatchConfiguration matchConfiguration =
      MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
          .continuously(
              Duration.millis(100),
              Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));

  PCollection<KV<String, Long>> output =
      FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath()
+ "/*", matchConfiguration)
          .apply("Window", Window.into(FixedWindows.of(duration)))
          .apply("LogWindowedResult", Log.ofElements("testGetData"));

  assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());

  Thread writer =
      new Thread(
          () -> {
            try {
              Thread.sleep(1000);

              Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
              Files.write(firstPath, Arrays.asList("1", "2", "3"));

              Thread.sleep(1000);

              Path firstPathComplete =
tmpFolder.newFile("2020-01-01.complete").toPath();
              Files.write(firstPathComplete, Arrays.asList());

              Thread.sleep(1000);

              Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
              Files.write(secondPath, Arrays.asList("4", "5", "6"));

              Thread.sleep(1000);

              Path secondPathComplete =
tmpFolder.newFile("2020-01-02.complete").toPath();
              Files.write(secondPathComplete, Arrays.asList());

            } catch (IOException | InterruptedException e) {
              throw new RuntimeException(e);
            }
          });
  writer.start();

  // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
  PAssert.that(output)
      .inWindow(firstWindow)
      .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
KV.of("my-key", 3L));

  // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
  PAssert.that(output)
      .inWindow(secondWindow)
      .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
KV.of("my-key", 6L));

  // THIS ASSERTION PASSES.
  PAssert.that(output)
      .containsInAnyOrder(
          KV.of("my-key", 1L),
          KV.of("my-key", 2L),
          KV.of("my-key", 3L),
          KV.of("my-key", 4L),
          KV.of("my-key", 5L),
          KV.of("my-key", 6L));

  p.run();

  writer.join();
}

Here is the code. Essentially, I am using *FileIO.match()* to match
filepattern. Then the file *Metadata* is processed by my custom Splittable
DoFn.

static PCollection<KV<String, Long>> getData(
    Pipeline pipeline, String filepattern, MatchConfiguration
matchConfiguration) {
  PCollection<Metadata> matches =
      pipeline.apply(
          FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
  return matches.apply(ParDo.of(new
ReadFileFn())).apply(Log.ofElements("Get Data"));
}

/**
 * Processes matched files by outputting key-value pairs where key is
equal to "my-key" and values
 * are Long values corresponding to the lines in the file. In the case
file does not contain one
 * Long per line, IOException is thrown.
 */
@DoFn.BoundedPerElement
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
  private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);

  @ProcessElement
  public void processElement(
      ProcessContext c,
      RestrictionTracker<OffsetRange, Long> tracker,
      ManualWatermarkEstimator<Instant> watermarkEstimator)
      throws IOException {
    Metadata metadata = c.element();
    logger.info(
        "reading {} with restriction {} @ {}",
        metadata,
        tracker.currentRestriction(),
        c.timestamp());
    String filename = metadata.resourceId().toString();
    Instant timestamp = getTimestamp(filename);
    try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
      String line;
      for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
        if (lineNumber < tracker.currentRestriction().getFrom()
            || lineNumber >= tracker.currentRestriction().getTo()) {
          continue;
        }
        if (!tracker.tryClaim(lineNumber)) {
          logger.info("failed to claim {}", lineNumber);
          return;
        }
        c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
      }
    }
    logger.info("setting watermark to {}", timestamp);
    watermarkEstimator.setWatermark(timestamp);
    logger.info("Finish processing {} in file {}",
tracker.currentRestriction(), filename);
  }

  private Instant getTimestamp(String filepath) {
    // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
    String filename = Paths.get(filepath).getFileName().toString();
    int index = filename.lastIndexOf(".complete");
    if (index != -1) {
      // In the case it has a suffix, strip it.
      filename = filename.substring(0, index);
    }
    Instant timestamp =
        Instant.parse(new
StringBuilder().append(filename).append("T00:00:00.000Z").toString());
    if (index != -1) {
      // In the case it has a suffix i.e. it is complete, fast forward
to the next day.
      return timestamp.plus(Duration.standardDays(1));
    }
    return timestamp;
  }

  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element Metadata metadata)
throws IOException {
    long lineCount;
    try (Stream<String> stream =
Files.lines(Paths.get(metadata.resourceId().toString()))) {
      lineCount = stream.count();
    }
    return new OffsetRange(0L, lineCount);
  }

  @GetInitialWatermarkEstimatorState
  public Instant getInitialWatermarkEstimatorState(
      @Element Metadata metadata, @Restriction OffsetRange restriction) {
    String filename = metadata.resourceId().toString();
    logger.info("getInitialWatermarkEstimatorState {}", filename);
    // Compute and return the initial watermark estimator state for
each element and restriction.
    // All subsequent processing of an element and restriction will be
restored from the existing
    // state.
    return getTimestamp(filename);
  }

  private static Instant ensureTimestampWithinBounds(Instant timestamp) {
    if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
    } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
      timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
    }
    return timestamp;
  }

  @NewWatermarkEstimator
  public WatermarkEstimators.Manual newWatermarkEstimator(
      @WatermarkEstimatorState Instant watermarkEstimatorState) {
    logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
    return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
  }
}



On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <lc...@google.com> wrote:

> I'm working on a blog post[1] about splittable dofns that covers this
> topic.
>
> The TLDR; is that FileIO.match() should allow users to control the
> watermark estimator that is used and for your use case you should hold the
> watermark to some computable value (e.g. the files are generated every hour
> so once you know the last file has appeared for that hour you advance the
> watermark to the current hour).
>
> 1:
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>
> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <pi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am looking into:
>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>> would like to create a continuous pipeline that reads from files and
>> assigns Event Times based on e.g. file metadata or actual data inside the
>> file. For example:
>>
>> private static void run(String[] args) {
>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>   Pipeline pipeline = Pipeline.create(options);
>>
>>   PCollection<Metadata> matches = pipeline
>>       .apply(FileIO.match()
>>           .filepattern("/tmp/input/*")
>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>   matches
>>       .apply(ParDo.of(new ReadFileFn()))
>>
>>   pipeline.run();
>> }
>>
>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>>
>>   @ProcessElement
>>   public void processElement(ProcessContext c) throws IOException {
>>     Metadata metadata = c.element();
>>     // I believe c.timestamp() is based on processing time.
>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>     String filename = metadata.resourceId().toString();
>>     // Output timestamps must be no earlier than the timestamp of the
>>     // current input minus the allowed skew (0 milliseconds).
>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>     logger.info("lastModified @ {}", timestamp);
>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>       String line;
>>       while ((line = br.readLine()) != null) {
>>         c.outputWithTimestamp(line, c.timestamp());
>>       }
>>     }
>>   }
>> }
>>
>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>> Javadoc for details on changing the allowed skew.
>>
>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>> event time for the PCollection<Metadata>. I can see that the call to
>> continuously() makes the PCollection unbounded and assigns default Event
>> Time. Without the call to continuously() I can assign the timestamps
>> without problems either via c.outputWithTimestamp or WithTimestamp
>> transform.
>>
>> I would like to know what is the way to fix the issue, and whether this
>> use-case is currently supported in Beam.
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Re: Processing files as they arrive with custom timestamps

Posted by Luke Cwik <lc...@google.com>.
I'm working on a blog post[1] about splittable dofns that covers this topic.

The TLDR; is that FileIO.match() should allow users to control the
watermark estimator that is used and for your use case you should hold the
watermark to some computable value (e.g. the files are generated every hour
so once you know the last file has appeared for that hour you advance the
watermark to the current hour).

1:
https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql

On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <pi...@gmail.com>
wrote:

> Hi,
>
> I am looking into:
> https://beam.apache.org/documentation/patterns/file-processing/ since I
> would like to create a continuous pipeline that reads from files and
> assigns Event Times based on e.g. file metadata or actual data inside the
> file. For example:
>
> private static void run(String[] args) {
>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>   Pipeline pipeline = Pipeline.create(options);
>
>   PCollection<Metadata> matches = pipeline
>       .apply(FileIO.match()
>           .filepattern("/tmp/input/*")
>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>   matches
>       .apply(ParDo.of(new ReadFileFn()))
>
>   pipeline.run();
> }
>
> private static final class ReadFileFn extends DoFn<Metadata, String> {
>   private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);
>
>   @ProcessElement
>   public void processElement(ProcessContext c) throws IOException {
>     Metadata metadata = c.element();
>     // I believe c.timestamp() is based on processing time.
>     logger.info("reading {} @ {}", metadata, c.timestamp());
>     String filename = metadata.resourceId().toString();
>     // Output timestamps must be no earlier than the timestamp of the
>     // current input minus the allowed skew (0 milliseconds).
>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>     logger.info("lastModified @ {}", timestamp);
>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>       String line;
>       while ((line = br.readLine()) != null) {
>         c.outputWithTimestamp(line, c.timestamp());
>       }
>     }
>   }
> }
>
> The issue is that when calling c.outputWithTimestamp() I am getting:
>
> Caused by: java.lang.IllegalArgumentException: Cannot output with
> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew.
>
> I believe this is because MatchPollFn.apply() uses Instant.now() as the
> event time for the PCollection<Metadata>. I can see that the call to
> continuously() makes the PCollection unbounded and assigns default Event
> Time. Without the call to continuously() I can assign the timestamps
> without problems either via c.outputWithTimestamp or WithTimestamp
> transform.
>
> I would like to know what is the way to fix the issue, and whether this
> use-case is currently supported in Beam.
>
> --
> Best regards,
> Piotr
>