You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/19 20:17:00 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15434: [BEAM-12824] Beam Connector for Reading Data from Delta Lake.

pabloem commented on a change in pull request #15434:
URL: https://github.com/apache/beam/pull/15434#discussion_r732195052



##########
File path: sdks/java/io/deltalake/src/main/java/io/delta/standalone/DeltaLog.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Copyright (2020) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.standalone;

Review comment:
       Sorry - I was not familiar with this style of working with dependencies - why don't we just depend on the Maven library? (https://mvnrepository.com/artifact/io.delta/delta-standalone)

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }
+
+    public Match withHadoopConfiguration(Configuration hadoopConfiguration) {
+      return withConfiguration(getConfiguration()
+          .withHadoopConfiguration(new SerializableConfiguration(hadoopConfiguration)));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Delta Create Filepattern",
+              Create.ofProvider(getFilepattern(), StringUtf8Coder.of())
+          )
+          .apply("Delta Via MatchAll",
+              matchAll().withConfiguration(getConfiguration())
+          )
+      ;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("configuration", getConfiguration());
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MatchAll.class);
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input)
+    {
+      logger.info("matchAll config: {}", getConfiguration());
+
+      PCollection<MatchResult.Metadata> res;
+      if (getConfiguration().getWatchInterval() == null) {
+        res =
+            input.apply(
+                "Delta MatchAll FilePatterns",
+                ParDo.of(new MatchOnceFn(getConfiguration())));
+      } else {
+        res =
+            input.apply(
+                    "Delta Continuously MatchAll filePatterns",
+                    Watch.growthOf(
+                            Contextful.of(new MatchPollFn(getConfiguration()), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.create());
+      }
+      return res.apply("Delta Reshuffle files", Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("configuration", getConfiguration());
+    }
+
+    private static class MatchOnceFn extends DoFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+
+      MatchOnceFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception
+      {
+        LOG.info("DeltaFileIO:MatchOnceFn processing {}", matchConfiguration);
+
+        String filePattern = c.element();
+
+        Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+            matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+
+        DeltaLog deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+        Snapshot deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+
+        List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+
+        LOG.info("DeltaFileIO:MatchOnceFn DeltaLog.forTables: version={}, numberOfFiles={}",
+            deltaSnapshot.getVersion(), deltaFiles.size());
+
+        String separator = filePattern.endsWith("/") ? "" : "/";
+
+        for (AddFile file : deltaFiles) {
+          String fullPath = filePattern + separator + file.getPath();
+          MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+          LOG.info("DeltaFileIO will process {}", match);

Review comment:
       maybe make this log `debug` level?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }

Review comment:
       Since these are not standard FileIO methods, can you add detailed documentation into what they represent?
   
   I was going to propose we use `Instant` for timestamps, but I see that Delta Lake uses integer timestamps in metadata files, huh? So maybe it's worth also clarifying that we expect timestamps from epoch.

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }
+
+    public Match withHadoopConfiguration(Configuration hadoopConfiguration) {
+      return withConfiguration(getConfiguration()
+          .withHadoopConfiguration(new SerializableConfiguration(hadoopConfiguration)));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Delta Create Filepattern",
+              Create.ofProvider(getFilepattern(), StringUtf8Coder.of())
+          )
+          .apply("Delta Via MatchAll",
+              matchAll().withConfiguration(getConfiguration())
+          )
+      ;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("configuration", getConfiguration());
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MatchAll.class);
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input)
+    {
+      logger.info("matchAll config: {}", getConfiguration());
+
+      PCollection<MatchResult.Metadata> res;
+      if (getConfiguration().getWatchInterval() == null) {
+        res =
+            input.apply(
+                "Delta MatchAll FilePatterns",
+                ParDo.of(new MatchOnceFn(getConfiguration())));
+      } else {
+        res =
+            input.apply(
+                    "Delta Continuously MatchAll filePatterns",
+                    Watch.growthOf(
+                            Contextful.of(new MatchPollFn(getConfiguration()), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.create());
+      }
+      return res.apply("Delta Reshuffle files", Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("configuration", getConfiguration());
+    }
+
+    private static class MatchOnceFn extends DoFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+
+      MatchOnceFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception
+      {
+        LOG.info("DeltaFileIO:MatchOnceFn processing {}", matchConfiguration);
+
+        String filePattern = c.element();
+
+        Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+            matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+
+        DeltaLog deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+        Snapshot deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+
+        List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+
+        LOG.info("DeltaFileIO:MatchOnceFn DeltaLog.forTables: version={}, numberOfFiles={}",
+            deltaSnapshot.getVersion(), deltaFiles.size());
+
+        String separator = filePattern.endsWith("/") ? "" : "/";
+
+        for (AddFile file : deltaFiles) {
+          String fullPath = filePattern + separator + file.getPath();
+          MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+          LOG.info("DeltaFileIO will process {}", match);
+          for (MatchResult.Metadata metadata : match.metadata()) {
+            c.output(metadata);
+          }
+        }
+      }
+    }
+
+    static DeltaLog getDeltaLog(String filePattern, Configuration hadoopConfiguration)
+    {
+      // Delta standalone use Hadoop Filesystem, so need to replace s3 schema
+      String deltaFilePattern = (filePattern.startsWith("s3://")) ?
+          filePattern.replaceFirst("s3://", "s3a://") : filePattern;
+      LOG.info("DeltaFileIO trying DeltaLog.forTables for pattern {}", deltaFilePattern);
+
+      DeltaLog deltaLog = DeltaLog.forTable(hadoopConfiguration, deltaFilePattern);
+      return deltaLog;
+    }
+
+    static Snapshot getDeltaSnapshot(DeltaLog deltaLog, MatchConfiguration matchConfiguration)
+    {
+      Snapshot deltaSnapshot;
+
+      if (matchConfiguration.getVersion() != null) {
+        logger.info("DeltaFileIO creating snapshot for version {}", matchConfiguration.getVersion());
+        deltaSnapshot = deltaLog.getSnapshotForVersionAsOf(matchConfiguration.getVersion());
+      } else if (matchConfiguration.getTimestamp() != null) {
+        logger.info("DeltaFileIO creating snapshot for timestamp {}", matchConfiguration.getTimestamp());
+        deltaSnapshot = deltaLog.getSnapshotForTimestampAsOf(matchConfiguration.getTimestamp());
+      } else {
+        logger.info("DeltaFileIO creating snapshot the latest version");
+        deltaSnapshot = deltaLog.snapshot();
+      }
+
+      return deltaSnapshot;
+    }
+
+    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+      private final long pollDutationMs;
+
+      private transient DeltaLog deltaLog = null;
+      private transient long deltaSnapshotVersion = -1;
+
+      MatchPollFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+        pollDutationMs =  matchConfiguration.getWatchInterval().getStandardSeconds() * 1000L;
+        LOG.info("DeltaFileIO.MatchPollFn created, pollDutationMs=" + pollDutationMs);
+      }
+
+      @Override
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String filePattern, Context c) throws Exception
+      {
+        Instant now = Instant.now();
+
+        List<MatchResult.Metadata> filesMetadata;
+
+        Snapshot deltaSnapshot;
+        if (deltaLog == null) {
+          Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+              matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+          deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+
+          deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+        } else {
+          deltaSnapshot = deltaLog.update();

Review comment:
       Don't we need to use the `matchConfiguration` if we update the deltaLog? So that we'll only get updates for a given version/timestamp if it's specified by the matchConfig?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();

Review comment:
       Please document these two in detail as well
   
   also - does it make sense to add a check so that only one of them is set, and error out if the user tries to set both?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));

Review comment:
       If they're supposed to match a single table, maybe it's worth showing what they look like usually? (e.g. `gs://bucket-name/database/table/` or something like that)

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }
+
+    public Match withHadoopConfiguration(Configuration hadoopConfiguration) {
+      return withConfiguration(getConfiguration()
+          .withHadoopConfiguration(new SerializableConfiguration(hadoopConfiguration)));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Delta Create Filepattern",
+              Create.ofProvider(getFilepattern(), StringUtf8Coder.of())
+          )
+          .apply("Delta Via MatchAll",
+              matchAll().withConfiguration(getConfiguration())
+          )
+      ;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("configuration", getConfiguration());
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MatchAll.class);
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input)
+    {
+      logger.info("matchAll config: {}", getConfiguration());
+
+      PCollection<MatchResult.Metadata> res;
+      if (getConfiguration().getWatchInterval() == null) {
+        res =
+            input.apply(
+                "Delta MatchAll FilePatterns",
+                ParDo.of(new MatchOnceFn(getConfiguration())));
+      } else {
+        res =
+            input.apply(
+                    "Delta Continuously MatchAll filePatterns",
+                    Watch.growthOf(
+                            Contextful.of(new MatchPollFn(getConfiguration()), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.create());
+      }
+      return res.apply("Delta Reshuffle files", Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("configuration", getConfiguration());
+    }
+
+    private static class MatchOnceFn extends DoFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+
+      MatchOnceFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception
+      {
+        LOG.info("DeltaFileIO:MatchOnceFn processing {}", matchConfiguration);
+
+        String filePattern = c.element();
+
+        Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+            matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+
+        DeltaLog deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+        Snapshot deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+
+        List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+
+        LOG.info("DeltaFileIO:MatchOnceFn DeltaLog.forTables: version={}, numberOfFiles={}",
+            deltaSnapshot.getVersion(), deltaFiles.size());
+
+        String separator = filePattern.endsWith("/") ? "" : "/";
+
+        for (AddFile file : deltaFiles) {
+          String fullPath = filePattern + separator + file.getPath();
+          MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+          LOG.info("DeltaFileIO will process {}", match);
+          for (MatchResult.Metadata metadata : match.metadata()) {
+            c.output(metadata);
+          }
+        }
+      }
+    }
+
+    static DeltaLog getDeltaLog(String filePattern, Configuration hadoopConfiguration)
+    {
+      // Delta standalone use Hadoop Filesystem, so need to replace s3 schema
+      String deltaFilePattern = (filePattern.startsWith("s3://")) ?
+          filePattern.replaceFirst("s3://", "s3a://") : filePattern;
+      LOG.info("DeltaFileIO trying DeltaLog.forTables for pattern {}", deltaFilePattern);
+
+      DeltaLog deltaLog = DeltaLog.forTable(hadoopConfiguration, deltaFilePattern);
+      return deltaLog;
+    }
+
+    static Snapshot getDeltaSnapshot(DeltaLog deltaLog, MatchConfiguration matchConfiguration)
+    {
+      Snapshot deltaSnapshot;
+
+      if (matchConfiguration.getVersion() != null) {
+        logger.info("DeltaFileIO creating snapshot for version {}", matchConfiguration.getVersion());
+        deltaSnapshot = deltaLog.getSnapshotForVersionAsOf(matchConfiguration.getVersion());
+      } else if (matchConfiguration.getTimestamp() != null) {
+        logger.info("DeltaFileIO creating snapshot for timestamp {}", matchConfiguration.getTimestamp());
+        deltaSnapshot = deltaLog.getSnapshotForTimestampAsOf(matchConfiguration.getTimestamp());
+      } else {
+        logger.info("DeltaFileIO creating snapshot the latest version");
+        deltaSnapshot = deltaLog.snapshot();
+      }
+
+      return deltaSnapshot;
+    }
+
+    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+      private final long pollDutationMs;
+
+      private transient DeltaLog deltaLog = null;
+      private transient long deltaSnapshotVersion = -1;
+
+      MatchPollFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+        pollDutationMs =  matchConfiguration.getWatchInterval().getStandardSeconds() * 1000L;
+        LOG.info("DeltaFileIO.MatchPollFn created, pollDutationMs=" + pollDutationMs);
+      }
+
+      @Override
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String filePattern, Context c) throws Exception
+      {
+        Instant now = Instant.now();
+
+        List<MatchResult.Metadata> filesMetadata;
+
+        Snapshot deltaSnapshot;
+        if (deltaLog == null) {
+          Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+              matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+          deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+
+          deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+        } else {
+          deltaSnapshot = deltaLog.update();
+        }
+
+        if (deltaSnapshot.getVersion() == deltaSnapshotVersion) {
+          // LOG.info("DeltaFileIO.MatchPollFn: No update for deltaSnapshotVersion={}", deltaSnapshotVersion);
+          filesMetadata = Collections.EMPTY_LIST;
+        } else {
+          List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+          LOG.info("DeltaFileIO.MatchPollFn: DeltaLog.updates, version={}, numberOfFiles={}",

Review comment:
       same. debug?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));

Review comment:
       Are File Patterns expected to match a single table in Delta Lake, or can they match multiple tables? It may be worth clarifying in Javadoc

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }
+
+    public Match withHadoopConfiguration(Configuration hadoopConfiguration) {
+      return withConfiguration(getConfiguration()
+          .withHadoopConfiguration(new SerializableConfiguration(hadoopConfiguration)));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Delta Create Filepattern",
+              Create.ofProvider(getFilepattern(), StringUtf8Coder.of())
+          )
+          .apply("Delta Via MatchAll",
+              matchAll().withConfiguration(getConfiguration())
+          )
+      ;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("configuration", getConfiguration());
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MatchAll.class);
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input)
+    {
+      logger.info("matchAll config: {}", getConfiguration());
+
+      PCollection<MatchResult.Metadata> res;
+      if (getConfiguration().getWatchInterval() == null) {
+        res =
+            input.apply(
+                "Delta MatchAll FilePatterns",
+                ParDo.of(new MatchOnceFn(getConfiguration())));
+      } else {
+        res =
+            input.apply(
+                    "Delta Continuously MatchAll filePatterns",
+                    Watch.growthOf(
+                            Contextful.of(new MatchPollFn(getConfiguration()), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.create());
+      }
+      return res.apply("Delta Reshuffle files", Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("configuration", getConfiguration());
+    }
+
+    private static class MatchOnceFn extends DoFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+
+      MatchOnceFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception
+      {
+        LOG.info("DeltaFileIO:MatchOnceFn processing {}", matchConfiguration);
+
+        String filePattern = c.element();
+
+        Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+            matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+
+        DeltaLog deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+        Snapshot deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+
+        List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+
+        LOG.info("DeltaFileIO:MatchOnceFn DeltaLog.forTables: version={}, numberOfFiles={}",
+            deltaSnapshot.getVersion(), deltaFiles.size());
+
+        String separator = filePattern.endsWith("/") ? "" : "/";
+
+        for (AddFile file : deltaFiles) {
+          String fullPath = filePattern + separator + file.getPath();
+          MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+          LOG.info("DeltaFileIO will process {}", match);
+          for (MatchResult.Metadata metadata : match.metadata()) {
+            c.output(metadata);
+          }
+        }
+      }
+    }
+
+    static DeltaLog getDeltaLog(String filePattern, Configuration hadoopConfiguration)
+    {
+      // Delta standalone use Hadoop Filesystem, so need to replace s3 schema
+      String deltaFilePattern = (filePattern.startsWith("s3://")) ?
+          filePattern.replaceFirst("s3://", "s3a://") : filePattern;
+      LOG.info("DeltaFileIO trying DeltaLog.forTables for pattern {}", deltaFilePattern);
+
+      DeltaLog deltaLog = DeltaLog.forTable(hadoopConfiguration, deltaFilePattern);
+      return deltaLog;
+    }
+
+    static Snapshot getDeltaSnapshot(DeltaLog deltaLog, MatchConfiguration matchConfiguration)
+    {
+      Snapshot deltaSnapshot;
+
+      if (matchConfiguration.getVersion() != null) {
+        logger.info("DeltaFileIO creating snapshot for version {}", matchConfiguration.getVersion());
+        deltaSnapshot = deltaLog.getSnapshotForVersionAsOf(matchConfiguration.getVersion());
+      } else if (matchConfiguration.getTimestamp() != null) {
+        logger.info("DeltaFileIO creating snapshot for timestamp {}", matchConfiguration.getTimestamp());
+        deltaSnapshot = deltaLog.getSnapshotForTimestampAsOf(matchConfiguration.getTimestamp());
+      } else {
+        logger.info("DeltaFileIO creating snapshot the latest version");
+        deltaSnapshot = deltaLog.snapshot();
+      }
+
+      return deltaSnapshot;
+    }
+
+    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+      private final long pollDutationMs;
+
+      private transient DeltaLog deltaLog = null;
+      private transient long deltaSnapshotVersion = -1;
+
+      MatchPollFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+        pollDutationMs =  matchConfiguration.getWatchInterval().getStandardSeconds() * 1000L;
+        LOG.info("DeltaFileIO.MatchPollFn created, pollDutationMs=" + pollDutationMs);
+      }
+
+      @Override
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String filePattern, Context c) throws Exception
+      {
+        Instant now = Instant.now();
+
+        List<MatchResult.Metadata> filesMetadata;
+
+        Snapshot deltaSnapshot;
+        if (deltaLog == null) {
+          Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+              matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+          deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+
+          deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+        } else {
+          deltaSnapshot = deltaLog.update();
+        }
+
+        if (deltaSnapshot.getVersion() == deltaSnapshotVersion) {
+          // LOG.info("DeltaFileIO.MatchPollFn: No update for deltaSnapshotVersion={}", deltaSnapshotVersion);
+          filesMetadata = Collections.EMPTY_LIST;
+        } else {
+          List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+          LOG.info("DeltaFileIO.MatchPollFn: DeltaLog.updates, version={}, numberOfFiles={}",
+              deltaSnapshot.getVersion(), deltaFiles.size());
+
+          String separator = filePattern.endsWith("/") ? "" : "/";
+          filesMetadata = new ArrayList<>();
+          for (AddFile file : deltaFiles) {
+            String fullPath = filePattern + separator + file.getPath();
+            MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+            // LOG.info("DeltaFileIO.MatchPollFn will process {}", match);
+            for (MatchResult.Metadata metadata : match.metadata()) {
+              filesMetadata.add(metadata);
+            }
+          }
+
+          deltaSnapshotVersion = deltaSnapshot.getVersion();
+          LOG.info("DeltaFileIO.MatchPollFn: for deltaSnapshotVersion={} updating filesMetadata.size={}",

Review comment:
       maybe worth making `debug`?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {

Review comment:
       It may be worth removing the methods that take `ValueProvider` arguments, since these are only used for 'old-style' Dataflow templates. Could you remove the ValueProvider arguments?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {

Review comment:
       is there any special treatment of files that we need a special implementation of `readMatches` for `DeltaFileIO`? Or could we just use ParquetIO for it?

##########
File path: sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ *     .filepattern("...")
+ *     .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot in Delta Lake</h3>
+ * *
+ * <pre>{@code
+     pipeline
+      .apply("Get Snapshot",
+        DeltaFileIO.snapshot()
+            .filepattern(filePattern)
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+      )
+      .apply("Read matched files",
+            DeltaFileIO.readMatches()
+      )
+      .apply("Read parquet files",
+          ParquetIO.readFiles(<SCHEMA>)
+      )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+  /**
+   * Process a filepattern using DeltaLake Standalone and produces a collection of parquet files
+   *  as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
+   */
+  public static Match snapshot() {
+    return new AutoValue_DeltaFileIO_Match.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #snapshot}, but matches each filepattern in a collection of filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_DeltaFileIO_MatchAll.Builder()
+        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Converts each result of {@link #snapshot} or {@link #matchAll} to a {@link ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_DeltaFileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and
+   * continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
+      return new AutoValue_DeltaFileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    public abstract @Nullable Duration getWatchInterval();
+
+    abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract @Nullable Long getVersion();
+
+    abstract @Nullable Long getTimestamp();
+
+    abstract @Nullable SerializableConfiguration getHadoopConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Builder setVersion(Long version);
+
+      abstract Builder setTimestamp(Long timestamp);
+
+      abstract Builder setHadoopConfiguration(SerializableConfiguration hadoopConfiguration);
+
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    public MatchConfiguration withVersion(Long version) {
+      return toBuilder().setVersion(version).build();
+    }
+
+    public MatchConfiguration withTimestamp(Long timestamp) {
+      return toBuilder().setTimestamp(timestamp).build();
+    }
+
+    public MatchConfiguration withHadoopConfiguration(SerializableConfiguration hadoopConfiguration) {
+      return toBuilder().setHadoopConfiguration(hadoopConfiguration).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the given termination
+     * condition is reached, where the input to the condition is the filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #snapshot}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>>
+  {
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return this.filepattern(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    public Match withVersion(Long version) {
+      return withConfiguration(getConfiguration().withVersion(version));
+    }
+
+    public Match withTimestamp(Long timestamp) {
+      return withConfiguration(getConfiguration().withTimestamp(timestamp));
+    }
+
+    public Match withHadoopConfiguration(Configuration hadoopConfiguration) {
+      return withConfiguration(getConfiguration()
+          .withHadoopConfiguration(new SerializableConfiguration(hadoopConfiguration)));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Delta Create Filepattern",
+              Create.ofProvider(getFilepattern(), StringUtf8Coder.of())
+          )
+          .apply("Delta Via MatchAll",
+              matchAll().withConfiguration(getConfiguration())
+          )
+      ;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("configuration", getConfiguration());
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MatchAll.class);
+
+    abstract MatchConfiguration getConfiguration();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input)
+    {
+      logger.info("matchAll config: {}", getConfiguration());
+
+      PCollection<MatchResult.Metadata> res;
+      if (getConfiguration().getWatchInterval() == null) {
+        res =
+            input.apply(
+                "Delta MatchAll FilePatterns",
+                ParDo.of(new MatchOnceFn(getConfiguration())));
+      } else {
+        res =
+            input.apply(
+                    "Delta Continuously MatchAll filePatterns",
+                    Watch.growthOf(
+                            Contextful.of(new MatchPollFn(getConfiguration()), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.create());
+      }
+      return res.apply("Delta Reshuffle files", Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("configuration", getConfiguration());
+    }
+
+    private static class MatchOnceFn extends DoFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+
+      MatchOnceFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception
+      {
+        LOG.info("DeltaFileIO:MatchOnceFn processing {}", matchConfiguration);
+
+        String filePattern = c.element();
+
+        Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+            matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+
+        DeltaLog deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+        Snapshot deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+
+        List<AddFile> deltaFiles = deltaSnapshot.getAllFiles();
+
+        LOG.info("DeltaFileIO:MatchOnceFn DeltaLog.forTables: version={}, numberOfFiles={}",
+            deltaSnapshot.getVersion(), deltaFiles.size());
+
+        String separator = filePattern.endsWith("/") ? "" : "/";
+
+        for (AddFile file : deltaFiles) {
+          String fullPath = filePattern + separator + file.getPath();
+          MatchResult match = FileSystems.match(fullPath, matchConfiguration.getEmptyMatchTreatment());
+          LOG.info("DeltaFileIO will process {}", match);
+          for (MatchResult.Metadata metadata : match.metadata()) {
+            c.output(metadata);
+          }
+        }
+      }
+    }
+
+    static DeltaLog getDeltaLog(String filePattern, Configuration hadoopConfiguration)
+    {
+      // Delta standalone use Hadoop Filesystem, so need to replace s3 schema
+      String deltaFilePattern = (filePattern.startsWith("s3://")) ?
+          filePattern.replaceFirst("s3://", "s3a://") : filePattern;
+      LOG.info("DeltaFileIO trying DeltaLog.forTables for pattern {}", deltaFilePattern);
+
+      DeltaLog deltaLog = DeltaLog.forTable(hadoopConfiguration, deltaFilePattern);
+      return deltaLog;
+    }
+
+    static Snapshot getDeltaSnapshot(DeltaLog deltaLog, MatchConfiguration matchConfiguration)
+    {
+      Snapshot deltaSnapshot;
+
+      if (matchConfiguration.getVersion() != null) {
+        logger.info("DeltaFileIO creating snapshot for version {}", matchConfiguration.getVersion());
+        deltaSnapshot = deltaLog.getSnapshotForVersionAsOf(matchConfiguration.getVersion());
+      } else if (matchConfiguration.getTimestamp() != null) {
+        logger.info("DeltaFileIO creating snapshot for timestamp {}", matchConfiguration.getTimestamp());
+        deltaSnapshot = deltaLog.getSnapshotForTimestampAsOf(matchConfiguration.getTimestamp());
+      } else {
+        logger.info("DeltaFileIO creating snapshot the latest version");
+        deltaSnapshot = deltaLog.snapshot();
+      }
+
+      return deltaSnapshot;
+    }
+
+    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata>
+    {
+      private final MatchConfiguration matchConfiguration;
+      private final long pollDutationMs;
+
+      private transient DeltaLog deltaLog = null;
+      private transient long deltaSnapshotVersion = -1;
+
+      MatchPollFn(MatchConfiguration matchConfiguration) {
+        this.matchConfiguration = matchConfiguration;
+        pollDutationMs =  matchConfiguration.getWatchInterval().getStandardSeconds() * 1000L;
+        LOG.info("DeltaFileIO.MatchPollFn created, pollDutationMs=" + pollDutationMs);
+      }
+
+      @Override
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String filePattern, Context c) throws Exception
+      {
+        Instant now = Instant.now();
+
+        List<MatchResult.Metadata> filesMetadata;
+
+        Snapshot deltaSnapshot;
+        if (deltaLog == null) {
+          Configuration hadoopConfiguration = (matchConfiguration.getHadoopConfiguration() != null) ?
+              matchConfiguration.getHadoopConfiguration().get() : new Configuration();
+          deltaLog = getDeltaLog(filePattern, hadoopConfiguration);
+
+          deltaSnapshot = getDeltaSnapshot(deltaLog, matchConfiguration);
+        } else {
+          deltaSnapshot = deltaLog.update();
+        }
+
+        if (deltaSnapshot.getVersion() == deltaSnapshotVersion) {
+          // LOG.info("DeltaFileIO.MatchPollFn: No update for deltaSnapshotVersion={}", deltaSnapshotVersion);

Review comment:
       I'm confused with the version story here. Do we only output data if we see a new version of the table? So as long as the table doesn't change we don't output data?
   
   If that's the intention - I am not sure that this implementation will work, because a polling function may be rescheduled  to a new worker, and the version history/variable will be reset to -1.
   
   Let me know if I misunderstood the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org