You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/07/13 20:11:40 UTC

[beam] branch master updated: Merge pull request #15786: Add gap-filling transform for timeseries

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b78a080e932 Merge pull request #15786: Add gap-filling transform for timeseries
b78a080e932 is described below

commit b78a080e93282c83b56a788459090530069033f2
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Jul 13 13:11:34 2022 -0700

    Merge pull request #15786: Add gap-filling transform for timeseries
---
 .github/autolabeler.yml                            |   1 +
 build.gradle.kts                                   |   1 +
 .../org/apache/beam/sdk/coders/SortedMapCoder.java | 197 ++++++++
 .../beam/sdk/schemas/transforms/WithKeys.java      |  79 +++
 sdks/java/extensions/timeseries/build.gradle       |  32 ++
 .../beam/sdk/extensions/timeseries/FillGaps.java   | 535 +++++++++++++++++++++
 .../sdk/extensions/timeseries/package-info.java    |  20 +
 .../sdk/extensions/timeseries/FillGapsTest.java    | 355 ++++++++++++++
 settings.gradle.kts                                |   1 +
 9 files changed, 1221 insertions(+)

diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml
index 715abeddecf..888c5d4f3a4 100644
--- a/.github/autolabeler.yml
+++ b/.github/autolabeler.yml
@@ -42,6 +42,7 @@ extensions: ["sdks/java/extensions/**/*", "runners/extensions-java/**/*"]
 "sketching": ["sdks/java/extensions/sketching/**/*"]
 "sorter": ["sdks/java/extensions/sorter/**/*"]
 "sql": ["sdks/java/extensions/sql/**/*"]
+"timeseries": ["sdks/java/extensions/timeseries/*"]
 "zetasketch": ["sdks/java/extensions/zetasketch/**/*"]
 
 # IO
diff --git a/build.gradle.kts b/build.gradle.kts
index aa5f8949fa5..7ea18895f77 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -195,6 +195,7 @@ tasks.register("javaPreCommitPortabilityApi") {
 
 tasks.register("javaPostCommit") {
   dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
+  dependsOn(":sdks:java:extensions:timeseries:postCommit")
   dependsOn(":sdks:java:extensions:zetasketch:postCommit")
   dependsOn(":sdks:java:extensions:ml:postCommit")
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
new file mode 100644
index 00000000000..8448c915654
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+
+/**
+ * A {@link Coder} for {@link SortedMap Maps} that encodes them according to provided coders for
+ * keys and values.
+ *
+ * @param <K> the type of the keys of the KVs being transcoded
+ * @param <V> the type of the values of the KVs being transcoded
+ */
+public class SortedMapCoder<K extends Comparable<? super K>, V>
+    extends StructuredCoder<SortedMap<K, V>> {
+  /** Produces a MapCoder with the given keyCoder and valueCoder. */
+  public static <K extends Comparable<? super K>, V> SortedMapCoder<K, V> of(
+      Coder<K> keyCoder, Coder<V> valueCoder) {
+    return new SortedMapCoder<>(keyCoder, valueCoder);
+  }
+
+  public Coder<K> getKeyCoder() {
+    return keyCoder;
+  }
+
+  public Coder<V> getValueCoder() {
+    return valueCoder;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private Coder<K> keyCoder;
+  private Coder<V> valueCoder;
+
+  private SortedMapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+  }
+
+  @Override
+  public void encode(SortedMap<K, V> map, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(map, outStream, Context.NESTED);
+  }
+
+  @Override
+  public void encode(SortedMap<K, V> map, OutputStream outStream, Context context)
+      throws IOException, CoderException {
+    if (map == null) {
+      throw new CoderException("cannot encode a null SortedMap");
+    }
+    DataOutputStream dataOutStream = new DataOutputStream(outStream);
+
+    int size = map.size();
+    dataOutStream.writeInt(size);
+    if (size == 0) {
+      return;
+    }
+
+    // Since we handled size == 0 above, entry is guaranteed to exist before and after loop
+    Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
+    Entry<K, V> entry = iterator.next();
+    while (iterator.hasNext()) {
+      keyCoder.encode(entry.getKey(), outStream);
+      valueCoder.encode(entry.getValue(), outStream);
+      entry = iterator.next();
+    }
+
+    keyCoder.encode(entry.getKey(), outStream);
+    valueCoder.encode(entry.getValue(), outStream, context);
+    // no flush needed as DataOutputStream does not buffer
+  }
+
+  @Override
+  public SortedMap<K, V> decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
+  public SortedMap<K, V> decode(InputStream inStream, Context context)
+      throws IOException, CoderException {
+    DataInputStream dataInStream = new DataInputStream(inStream);
+    int size = dataInStream.readInt();
+    if (size == 0) {
+      return Collections.emptySortedMap();
+    }
+
+    SortedMap<K, V> retval = Maps.newTreeMap();
+    for (int i = 0; i < size - 1; ++i) {
+      K key = keyCoder.decode(inStream);
+      V value = valueCoder.decode(inStream);
+      retval.put(key, value);
+    }
+
+    K key = keyCoder.decode(inStream);
+    V value = valueCoder.decode(inStream, context);
+    retval.put(key, value);
+    return retval;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1.
+   */
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(keyCoder, valueCoder);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws NonDeterministicException always. Not all maps have a deterministic encoding. For
+   *     example, {@code HashMap} comparison does not depend on element order, so two {@code
+   *     HashMap} instances may be equal but produce different encodings.
+   */
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(
+        this, "Ordering of entries in a Map may be non-deterministic.");
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals();
+  }
+
+  @Override
+  public Object structuralValue(SortedMap<K, V> value) {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      Map<Object, Object> ret = Maps.newHashMapWithExpectedSize(value.size());
+      for (Map.Entry<K, V> entry : value.entrySet()) {
+        ret.put(
+            keyCoder.structuralValue(entry.getKey()), valueCoder.structuralValue(entry.getValue()));
+      }
+      return ret;
+    }
+  }
+
+  @Override
+  public void registerByteSizeObserver(SortedMap<K, V> map, ElementByteSizeObserver observer)
+      throws Exception {
+    observer.update(4L);
+    if (map.isEmpty()) {
+      return;
+    }
+    Iterator<Entry<K, V>> entries = map.entrySet().iterator();
+    Entry<K, V> entry = entries.next();
+    while (entries.hasNext()) {
+      keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+      valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+      entry = entries.next();
+    }
+    keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+    valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+  }
+
+  @Override
+  public TypeDescriptor<SortedMap<K, V>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<SortedMap<K, V>>() {}.where(
+            new TypeParameter<K>() {}, keyCoder.getEncodedTypeDescriptor())
+        .where(new TypeParameter<V>() {}, valueCoder.getEncodedTypeDescriptor());
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java
new file mode 100644
index 00000000000..d057b75c677
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class WithKeys<T> extends PTransform<PCollection<T>, PCollection<KV<Row, T>>> {
+  private final FieldAccessDescriptor fieldAccessDescriptor;
+
+  public static <T> WithKeys<T> of(FieldAccessDescriptor fieldAccessDescriptor) {
+    return new WithKeys<>(fieldAccessDescriptor);
+  }
+
+  private WithKeys(FieldAccessDescriptor fieldAccessDescriptor) {
+    this.fieldAccessDescriptor = fieldAccessDescriptor;
+  }
+
+  @Override
+  public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+    Schema schema = input.getSchema();
+    TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+    if (typeDescriptor == null) {
+      throw new RuntimeException("Null type descriptor on input.");
+    }
+    SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+    SerializableFunction<Row, T> fromRowFunction = input.getFromRowFunction();
+
+    FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+    RowSelector rowSelector = new RowSelectorContainer(schema, resolved, true);
+    Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+    return input
+        .apply(
+            "selectKeys",
+            ParDo.of(
+                new DoFn<T, KV<Row, T>>() {
+                  @ProcessElement
+                  public void process(
+                      @Element Row row, // Beam will convert the element to a row.
+                      @Element T element, // Beam will return the original element.
+                      OutputReceiver<KV<Row, T>> o) {
+                    o.output(KV.of(rowSelector.select(row), element));
+                  }
+                }))
+        .setCoder(
+            KvCoder.of(
+                SchemaCoder.of(keySchema),
+                SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction)));
+  }
+}
diff --git a/sdks/java/extensions/timeseries/build.gradle b/sdks/java/extensions/timeseries/build.gradle
new file mode 100644
index 00000000000..fd8b961b0ad
--- /dev/null
+++ b/sdks/java/extensions/timeseries/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+  automaticModuleName: 'org.apache.beam.sdk.extensions.timeseries'
+)
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries"
+
+dependencies {
+  implementation library.java.vendored_guava_26_0_jre
+  implementation library.java.joda_time
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  testImplementation library.java.junit
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
new file mode 100644
index 00000000000..1e33f292b83
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.SortedMapCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.transforms.WithKeys;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Fill gaps in timeseries. Values are expected to have Beam schemas registered.
+ *
+ * <p>This transform views the original PCollection as a collection of timeseries, each with a different key. They
+ * key to be used and the timeseries bucket size are both specified in the {@link #of} creation method. Multiple
+ * fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the
+ * original {@link PCollection} will appear unchanged in the output PCollection, with timestamp and window unchanged.
+ * Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element
+ * (by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is
+ * the end of the bucket, and the original PCollection's window function is used to assign it to a window.
+ *
+ *
+ * <p>Example usage: the following code views each user,country pair in the input {@link PCollection} as a timeseries
+ * with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from
+ * the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there
+ * are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in
+ * {@link #withMaxGapFillBuckets}.
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withMaxGapFillBuckets(3600L)));
+ *  gapFilled.apply(MySink.create());
+ *     }</pre>
+ *
+ * <p>By default, the latest element from the previous bucket is propagated into missing buckets. The user can override
+ * this using the {@link #withMergeFunction} method. Several built-in merge functions are provided for -
+ * {@link #keepLatest()} (the default), {@link #keepEarliest()}, an {@link #keepNull()}.
+ *
+ * <p>Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the
+ * following element type containing a timestamp:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * class MyType {
+ *   MyData data;
+ *   Instant timestamp;
+ *   @SchemaCreate
+ *   MyType(MyData data, Instant timestamp) {
+ *       this.data = data;
+ *       this.timestamp - timestamp;
+ *   }
+ * })</pre>
+ *
+ * The element timestamps should always be contained in its current timeseries bucket, so the element needs to be
+ * modified when propagated to a new bucket. This can be done using the {@link #withInterpolateFunction}} method, as
+ * follows:
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withInterpolateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp()))
+ *        .withMaxGapFillBuckets(360L)));
+ *  gapFilled.apply(MySink.create());
+ *  }</pre>
+ */
+@AutoValue
+public abstract class FillGaps<ValueT>
+    extends PTransform<PCollection<ValueT>, PCollection<ValueT>> {
+  // We garbage collect every 60 windows by default.
+  private static final int GC_EVERY_N_BUCKETS = 60;
+
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element with the latest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          keepLatest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v1 : v2;
+  }
+
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element with the earliest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          keepEarliest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v2 : v1;
+  }
+
+  /** Argument to withInterpolateFunction function. */
+  @AutoValue
+  public abstract static class InterpolateData<ValueT> {
+    public abstract TimestampedValue<ValueT> getValue();
+
+    public abstract BoundedWindow getPreviousWindow();
+
+    public abstract BoundedWindow getNextWindow();
+  }
+
+  abstract Duration getTimeseriesBucketDuration();
+
+  abstract Long getMaxGapFillBuckets();
+
+  abstract Instant getStopTime();
+
+  abstract FieldAccessDescriptor getKeyDescriptor();
+
+  abstract SerializableBiFunction<
+          TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+      getMergeValues();
+
+  abstract int getGcEveryNBuckets();
+
+  @Nullable
+  abstract SerializableFunction<InterpolateData<ValueT>, ValueT> getInterpolateFunction();
+
+  abstract Builder<ValueT> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<ValueT> {
+    abstract Builder<ValueT> setTimeseriesBucketDuration(Duration value);
+
+    abstract Builder<ValueT> setMaxGapFillBuckets(Long value);
+
+    abstract Builder<ValueT> setStopTime(Instant value);
+
+    abstract Builder<ValueT> setKeyDescriptor(FieldAccessDescriptor keyDescriptor);
+
+    abstract Builder<ValueT> setMergeValues(
+        SerializableBiFunction<
+                TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+            mergeValues);
+
+    abstract Builder<ValueT> setInterpolateFunction(
+        @Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction);
+
+    abstract Builder<ValueT> setGcEveryNBuckets(int gcEveryNBuckets);
+
+    abstract FillGaps<ValueT> build();
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, String... keys) {
+    return of(windowDuration, FieldAccessDescriptor.withFieldNames(keys));
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(
+      Duration windowDuration, FieldAccessDescriptor keyDescriptor) {
+    return new AutoValue_FillGaps.Builder<ValueT>()
+        .setTimeseriesBucketDuration(windowDuration)
+        .setMaxGapFillBuckets(Long.MAX_VALUE)
+        .setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE)
+        .setKeyDescriptor(keyDescriptor)
+        .setMergeValues(keepLatest())
+        .setGcEveryNBuckets(GC_EVERY_N_BUCKETS)
+        .build();
+  }
+
+  /* The max gap duration that will be filled. The transform will stop filling timeseries buckets after this duration. */
+  FillGaps<ValueT> withMaxGapFillBuckets(Long value) {
+    return toBuilder().setMaxGapFillBuckets(value).build();
+  }
+
+  /* A hard (event-time) stop time for the transform. */
+  FillGaps<ValueT> withStopTime(Instant stopTime) {
+    return toBuilder().setStopTime(stopTime).build();
+  }
+
+  /**
+   * If there are multiple values in a single timeseries bucket, this function is used to specify
+   * what to propagate to the next bucket. If not specified, then the value with the latest
+   * timestamp will be propagated.
+   */
+  FillGaps<ValueT> withMergeFunction(
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          mergeFunction) {
+    return toBuilder().setMergeValues(mergeFunction).build();
+  }
+
+  /**
+   * This function can be used to modify elements before propagating to the next bucket. A common
+   * use case is to modify a contained timestamp to match that of the new bucket.
+   */
+  FillGaps<ValueT> withInterpolateFunction(
+      SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction) {
+    return toBuilder().setInterpolateFunction(interpolateFunction).build();
+  }
+
+  @Override
+  public PCollection<ValueT> expand(PCollection<ValueT> input) {
+    if (!input.hasSchema()) {
+      throw new RuntimeException("The input to FillGaps must have a schema.");
+    }
+
+    FixedWindows bucketWindows = FixedWindows.of(getTimeseriesBucketDuration());
+    // TODO(reuvenlax, BEAM-12795): We need to create KVs to use state/timers. Once BEAM-12795 is
+    // fixed we can dispense with the KVs here.
+    PCollection<KV<Row, ValueT>> keyedValues =
+        input
+            .apply("FixedWindow", Window.into(bucketWindows))
+            .apply("withKeys", WithKeys.of(getKeyDescriptor()));
+
+    WindowFn<ValueT, BoundedWindow> originalWindowFn =
+        (WindowFn<ValueT, BoundedWindow>) input.getWindowingStrategy().getWindowFn();
+    return keyedValues
+        .apply("globalWindow", Window.into(new GlobalWindows()))
+        .apply(
+            "fillGaps",
+            ParDo.of(
+                new FillGapsDoFn<>(
+                    bucketWindows,
+                    input.getCoder(),
+                    getStopTime(),
+                    getMaxGapFillBuckets(),
+                    getMergeValues(),
+                    getInterpolateFunction(),
+                    getGcEveryNBuckets())))
+        .apply("applyOriginalWindow", Window.into(originalWindowFn))
+        .setCoder(input.getCoder());
+  }
+
+  public static class FillGapsDoFn<ValueT> extends DoFn<KV<Row, ValueT>, ValueT> {
+    // The window size used.
+    private final FixedWindows bucketWindows;
+    // The garbage-collection window (GC_EVERY_N_BUCKETS * fixedWindows.getSize()).
+    private final FixedWindows gcWindows;
+    // The stop time.
+    private final Instant stopTime;
+    // The max gap-duration to fill. Once the gap fill exceeds this, we will stop filling the gap.
+    private final long maxGapFillBuckets;
+
+    private final SerializableBiFunction<
+            TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+        mergeValues;
+
+    @Nullable
+    private final SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction;
+
+    // A timer map used to fill potential gaps. Each logical "window" will have a separate timer
+    // which will be cleared if an element arrives in that window. This way the timer will only fire
+    // if there is a gap, at which point it will fill the gap.
+    @TimerFamily("gapTimers")
+    @SuppressWarnings({"UnusedVariable"})
+    private final TimerSpec gapFillingTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Timers used to garbage collect state.
+    @TimerFamily("gcTimers")
+    @SuppressWarnings({"UnusedVariable"})
+    private final TimerSpec gcTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Keep track of windows already seen. In the future we can replace this with OrderedListState.
+    // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+    @StateId("seenBuckets")
+    @SuppressWarnings({"UnusedVariable"})
+    private final StateSpec<ValueState<SortedMap<Instant, TimestampedValue<ValueT>>>>
+        seenBucketsSpec;
+
+    // For every window, keep track of how long the filled gap is in buckets. If a window was
+    // populated by a received element - i.e.
+    // it's not
+    // a gap fill - then there is no value in this map for that window.
+    // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+    @StateId("gapDurationMap")
+    @SuppressWarnings({"UnusedVariable"})
+    private final StateSpec<ValueState<SortedMap<Instant, Long>>> gapDurationSpec;
+
+    FillGapsDoFn(
+        FixedWindows bucketWindows,
+        Coder<ValueT> valueCoder,
+        Instant stopTime,
+        long maxGapFillBuckets,
+        SerializableBiFunction<
+                TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+            mergeValues,
+        @Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction,
+        int gcEveryNBuckets) {
+      this.bucketWindows = bucketWindows;
+      this.gcWindows = FixedWindows.of(bucketWindows.getSize().multipliedBy(gcEveryNBuckets));
+      this.stopTime = stopTime;
+      this.maxGapFillBuckets = maxGapFillBuckets;
+      this.seenBucketsSpec =
+          StateSpecs.value(
+              SortedMapCoder.of(InstantCoder.of(), TimestampedValueCoder.of(valueCoder)));
+      this.gapDurationSpec =
+          StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), VarLongCoder.of()));
+      this.mergeValues = mergeValues;
+      this.interpolateFunction = interpolateFunction;
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Row, ValueT> element,
+        @Timestamp Instant ts,
+        @TimerFamily("gapTimers") TimerMap gapTimers,
+        @TimerFamily("gcTimers") TimerMap gcTimers,
+        @AlwaysFetched @StateId("seenBuckets")
+            ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+        OutputReceiver<ValueT> output) {
+      if (ts.isAfter(stopTime)) {
+        return;
+      }
+
+      Instant windowEndTs = bucketWindows.assignWindow(ts).end();
+      if (processEvent(
+          () -> TimestampedValue.of(element.getValue(), ts),
+          windowEndTs,
+          gapTimers,
+          gcTimers,
+          seenBuckets,
+          -1,
+          output)) {
+        // We've seen data for this window, so clear any gap-filling timer.
+        gapTimers.get(windowToTimerTag(windowEndTs)).clear();
+      }
+    }
+
+    private String windowToTimerTag(Instant endTs) {
+      return Long.toString(endTs.getMillis());
+    }
+
+    private Instant windowFromTimerTag(String key) {
+      return Instant.ofEpochMilli(Long.parseLong(key));
+    }
+
+    @OnTimerFamily("gapTimers")
+    public void onTimer(
+        @TimerId String timerId,
+        @Timestamp Instant timestamp,
+        @TimerFamily("gapTimers") TimerMap gapTimers,
+        @TimerFamily("gcTimers") TimerMap gcTimers,
+        @AlwaysFetched @StateId("seenBuckets")
+            ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+        @AlwaysFetched @StateId("gapDurationMap") ValueState<SortedMap<Instant, Long>> gapDurations,
+        OutputReceiver<ValueT> output) {
+      Instant bucketEndTs = windowFromTimerTag(timerId);
+      Instant bucketMaxTs = bucketEndTs.minus(Duration.millis(1));
+      Instant previousBucketEndTs = bucketEndTs.minus(bucketWindows.getSize());
+      Instant previousBucketMaxTs = previousBucketEndTs.minus(Duration.millis(1));
+
+      Map<Instant, TimestampedValue<ValueT>> seenBucketMap = seenBuckets.read();
+      if (seenBucketMap == null) {
+        throw new RuntimeException("Unexpected timer fired with no seenBucketMap.");
+      }
+
+      @Nullable SortedMap<Instant, Long> gapDurationsMap = gapDurations.read();
+      long gapSize = 0;
+      if (gapDurationsMap != null) {
+        gapSize = gapDurationsMap.getOrDefault(previousBucketEndTs, 0L);
+      }
+      // If the timer fires and we've never seen an element for this window then we reach into the
+      // previous
+      // window and copy its value into this window. This relies on the fact that timers fire in
+      // order
+      // for a given key, so if there are multiple gap windows  then the previous window will be
+      // filled
+      // by the time we get here.
+      // processEvent will also set a timer for the next window if we haven't already seen an
+      // element
+      // for that window.
+      processEvent(
+          () -> {
+            TimestampedValue<ValueT> previous = seenBucketMap.get(previousBucketEndTs);
+            if (previous == null) {
+              throw new RuntimeException(
+                  "Processing bucket for "
+                      + bucketEndTs
+                      + " before processing bucket "
+                      + "for "
+                      + previousBucketEndTs);
+            }
+            ValueT value = previous.getValue();
+            if (interpolateFunction != null) {
+              BoundedWindow previousBucket = bucketWindows.assignWindow(previousBucketMaxTs);
+              BoundedWindow currentBucket = bucketWindows.assignWindow(bucketMaxTs);
+              Preconditions.checkState(!currentBucket.equals(previousBucket));
+              value =
+                  interpolateFunction.apply(
+                      new AutoValue_FillGaps_InterpolateData<>(
+                          previous, previousBucket, currentBucket));
+            }
+            return TimestampedValue.of(value, bucketMaxTs);
+          },
+          bucketEndTs,
+          gapTimers,
+          gcTimers,
+          seenBuckets,
+          gapSize,
+          output);
+      if (!seenBucketMap.containsKey(bucketEndTs.plus(bucketWindows.getSize()))) {
+        // The next bucket is still empty, so update gapDurations
+        if (gapDurationsMap == null) {
+          gapDurationsMap = Maps.newTreeMap();
+        }
+        gapDurationsMap.put(bucketEndTs, gapSize + 1);
+        gapDurations.write(gapDurationsMap);
+      }
+    }
+
+    @OnTimerFamily("gcTimers")
+    public void onGcTimer(
+        @Timestamp Instant timerTs,
+        @AlwaysFetched @StateId("seenBuckets")
+            ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+        @AlwaysFetched @StateId("gapDurationMap")
+            ValueState<SortedMap<Instant, Long>> gapDurations) {
+      gcMap(seenBuckets, timerTs.minus(gcWindows.getSize()));
+      gcMap(gapDurations, timerTs.minus(gcWindows.getSize()));
+    }
+
+    // returns true if this is the first event for the bucket.
+    private boolean processEvent(
+        Supplier<TimestampedValue<ValueT>> getValue,
+        Instant bucketEndTs,
+        TimerMap gapTimers,
+        TimerMap gcTimers,
+        ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+        long gapSize,
+        OutputReceiver<ValueT> output) {
+      TimestampedValue<ValueT> value = getValue.get();
+      output.outputWithTimestamp(value.getValue(), value.getTimestamp());
+
+      boolean firstElementInBucket = true;
+      TimestampedValue<ValueT> valueToWrite = value;
+      SortedMap<Instant, TimestampedValue<ValueT>> seenBucketsMap = seenBuckets.read();
+      if (seenBucketsMap == null) {
+        seenBucketsMap = Maps.newTreeMap();
+      } else {
+        @Nullable TimestampedValue<ValueT> existing = seenBucketsMap.get(bucketEndTs);
+        if (existing != null) {
+          valueToWrite = mergeValues.apply(existing, value);
+          // No need to set a timer as we've already seen an element for this window before.
+          firstElementInBucket = false;
+        }
+      }
+
+      // Update the seenWindows state variable.
+      seenBucketsMap.put(bucketEndTs, valueToWrite);
+      seenBuckets.write(seenBucketsMap);
+
+      if (firstElementInBucket) {
+        // Potentially set a timer for the next window.
+
+        // Here we calculate how long the gap-extension duration (the total size of all gaps filled
+        // since
+        // the last element seen) would be for the next window. If this would exceed the max-gap
+        // duration
+        // set by the user, then stop.
+        Instant nextBucketEndTs = bucketEndTs.plus(bucketWindows.getSize());
+        Instant nextBucketMaxTs = nextBucketEndTs.minus(Duration.millis(1));
+        // Set a gap-filling timer for the next window if we haven't yet seen that window.
+        if (nextBucketMaxTs.isBefore(stopTime)
+            && gapSize + 1 < maxGapFillBuckets
+            && !seenBucketsMap.containsKey(nextBucketEndTs)) {
+          gapTimers
+              .get(windowToTimerTag(nextBucketEndTs))
+              .withOutputTimestamp(bucketEndTs)
+              .set(nextBucketEndTs);
+        }
+
+        // Set a gcTimer
+        Instant gcTs = gcWindows.assignWindow(nextBucketEndTs).end();
+        gcTimers.get(windowToTimerTag(gcTs)).set(gcTs);
+      }
+      return firstElementInBucket;
+    }
+
+    private static <V> void gcMap(ValueState<SortedMap<Instant, V>> mapState, Instant ts) {
+      SortedMap<Instant, V> map = mapState.read();
+      if (map != null) {
+        // Clear all map elements that are for windows strictly less than timerTs.
+        map.headMap(ts).clear();
+        if (map.isEmpty()) {
+          mapState.clear();
+        } else {
+          mapState.write(map);
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java
new file mode 100644
index 00000000000..305e0db0d0f
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Utilities for operating on timeseries data. */
+package org.apache.beam.sdk.extensions.timeseries;
diff --git a/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java
new file mode 100644
index 00000000000..da419ec1820
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesLoopingTimer;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Reify;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class FillGapsTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class Message {
+    abstract String getKey();
+
+    abstract String getValue();
+
+    abstract Instant getTimestamp();
+
+    static Message update(FillGaps.InterpolateData<Message> interpolateData) {
+      Message value = interpolateData.getValue().getValue();
+      Instant nextWindowMax = interpolateData.getNextWindow().maxTimestamp();
+      return value.toBuilder().setTimestamp(nextWindowMax).build();
+    }
+
+    static Message of(String key, String value, Instant timestamp) {
+      return new AutoValue_FillGapsTest_Message.Builder()
+          .setKey(key)
+          .setValue(value)
+          .setTimestamp(timestamp)
+          .build();
+    }
+
+    static TimestampedValue<Message> ofTimestamped(String key, String value, Instant timestamp) {
+      return TimestampedValue.of(of(key, value, timestamp), timestamp);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setKey(String key);
+
+      abstract Builder setValue(String value);
+
+      abstract Builder setTimestamp(Instant timestamp);
+
+      abstract Message build();
+    }
+
+    abstract Builder toBuilder();
+  }
+
+  @Test
+  public void testFillGaps() {
+    List<TimestampedValue<Message>> values =
+        ImmutableList.of(
+            Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+            Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+    PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+    PCollection<TimestampedValue<Message>> gapFilled =
+        input
+            .apply(
+                FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+                    .withStopTime(Instant.ofEpochSecond(5)))
+            .apply(Reify.timestamps());
+
+    FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+    PAssert.that(gapFilled)
+        .containsInAnyOrder(
+            Iterables.concat(
+                values,
+                ImmutableList.of(
+                    TimestampedValue.of(
+                        Message.of(
+                            "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of(
+                            "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key1", "value3", Instant.ofEpochSecond(3)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value3", Instant.ofEpochSecond(3)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()))));
+    pipeline.run();
+  }
+
+  @Test
+  public void testFillGapsKeepEarliest() {
+    List<TimestampedValue<Message>> values =
+        ImmutableList.of(
+            Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+            Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+    PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+    PCollection<TimestampedValue<Message>> gapFilled =
+        input
+            .apply(
+                FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+                    .withMergeFunction(FillGaps.keepEarliest())
+                    .withStopTime(Instant.ofEpochSecond(5)))
+            .apply(Reify.timestamps());
+
+    FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+    PAssert.that(gapFilled)
+        .containsInAnyOrder(
+            Iterables.concat(
+                values,
+                ImmutableList.of(
+                    TimestampedValue.of(
+                        Message.of("key1", "value1<", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value1<", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key1", "value3", Instant.ofEpochSecond(3)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value3", Instant.ofEpochSecond(3)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()))));
+    pipeline.run();
+  }
+
+  @Test
+  public void testFillGapsMaxDuration() {
+    List<TimestampedValue<Message>> values =
+        ImmutableList.of(
+            Message.ofTimestamped("key1", "value0", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key1", "value1", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped("key2", "value1", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(10)),
+            Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(10)));
+
+    PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+    PCollection<TimestampedValue<Message>> gapFilled =
+        input
+            .apply(
+                FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+                    .withMaxGapFillBuckets(4L)
+                    .withStopTime(Instant.ofEpochSecond(11)))
+            .apply(Reify.timestamps());
+
+    FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+    PAssert.that(gapFilled)
+        .containsInAnyOrder(
+            Iterables.concat(
+                values,
+                ImmutableList.of(
+                    TimestampedValue.of(
+                        Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+                    TimestampedValue.of(
+                        Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+                        fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp()))));
+    pipeline.run();
+  }
+
+  @Test
+  public void testFillGapsPropagateFunction() {
+    List<TimestampedValue<Message>> values =
+        ImmutableList.of(
+            Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+            Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+            Message.ofTimestamped(
+                "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+            Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+            Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+    PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+    PCollection<TimestampedValue<Message>> gapFilled =
+        input
+            .apply(
+                FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+                    .withInterpolateFunction(Message::update)
+                    .withStopTime(Instant.ofEpochSecond(5)))
+            .apply(Reify.timestamps());
+
+    FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+    Instant bucketTwoMax = fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp();
+    Instant bucketFourMax = fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp();
+
+    PAssert.that(gapFilled)
+        .containsInAnyOrder(
+            Iterables.concat(
+                values,
+                ImmutableList.of(
+                    Message.ofTimestamped("key1", "value1", bucketTwoMax),
+                    Message.ofTimestamped("key2", "value1", bucketTwoMax),
+                    Message.ofTimestamped("key1", "value3", bucketFourMax),
+                    Message.ofTimestamped("key2", "value3", bucketFourMax))));
+    pipeline.run();
+  }
+
+  // TODO: This test fails due to DirectRunner bugs. Uncomment once those bugs are fixed.
+  @Test
+  @Category({
+    UsesTimersInParDo.class,
+    UsesLoopingTimer.class,
+    UsesStatefulParDo.class,
+    UsesStrictTimerOrdering.class
+  })
+  public void testFillGapsFuzz() {
+    for (int i = 0; i < 6; ++i) {
+      fuzzTest(10, 500, 25, 20);
+    }
+  }
+
+  public void fuzzTest(int numKeys, int numBuckets, int maxGapSizeToGenerate, long maxGapSize) {
+    Pipeline p = Pipeline.create();
+    List<TimestampedValue<Message>> values = Lists.newArrayList();
+    List<TimestampedValue<Message>> expectedGaps = Lists.newArrayList();
+    for (int i = 0; i < numKeys; ++i) {
+      String key = "key" + i;
+      generateFuzzTimerseries(
+          key, numBuckets, maxGapSizeToGenerate, maxGapSize, values, expectedGaps);
+    }
+
+    PCollection<Message> input = p.apply(Create.timestamped(values));
+    PCollection<TimestampedValue<Message>> gapFilled =
+        input
+            .apply(
+                FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+                    .withInterpolateFunction(Message::update)
+                    .withMaxGapFillBuckets(maxGapSize)
+                    .withStopTime(Instant.ofEpochSecond(numBuckets)))
+            .apply(Reify.timestamps());
+
+    PAssert.that(gapFilled).containsInAnyOrder(Iterables.concat(values, expectedGaps));
+    p.run();
+  }
+
+  void generateFuzzTimerseries(
+      String key,
+      int numBuckets,
+      int maxGapSizeToGenerate,
+      long maxGapSize,
+      List<TimestampedValue<Message>> values,
+      List<TimestampedValue<Message>> expectedGaps) {
+    Random random = new Random();
+    String lastValue = null;
+    int currentGapSize = 0;
+    for (int bucket = 0; bucket < numBuckets; ) {
+      if (lastValue != null && currentGapSize < maxGapSizeToGenerate && random.nextInt(10) == 0) {
+        // 10% chance of creating a gap.
+        int gapSize = random.nextInt(maxGapSizeToGenerate) + 1;
+        int lastGapBucket = Math.min(bucket + gapSize, numBuckets);
+
+        for (; bucket < lastGapBucket; ++bucket) {
+          if (currentGapSize < maxGapSize) {
+            addBucketToTimeseries(key, lastValue, bucket, expectedGaps);
+          }
+          ++currentGapSize;
+        }
+      } else {
+        lastValue = "bucket" + bucket;
+        currentGapSize = 0;
+        addBucketToTimeseries(key, lastValue, bucket, values);
+        ++bucket;
+      }
+    }
+  }
+
+  void addBucketToTimeseries(
+      String key, String value, int bucket, List<TimestampedValue<Message>> list) {
+    FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+    BoundedWindow currentBucket = fixedWindows.assignWindow(Instant.ofEpochSecond(bucket));
+    TimestampedValue<Message> message =
+        Message.ofTimestamped(key, value, currentBucket.maxTimestamp());
+    list.add(message);
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7cc83b9698b..357a99c5c65 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -145,6 +145,7 @@ include(":sdks:java:extensions:sql:zetasql")
 include(":sdks:java:extensions:sql:expansion-service")
 include(":sdks:java:extensions:sql:udf")
 include(":sdks:java:extensions:sql:udf-test-provider")
+include(":sdks:java:extensions:timeseries")
 include(":sdks:java:extensions:zetasketch")
 include(":sdks:java:fn-execution")
 include(":sdks:java:harness")