You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/07/13 20:11:40 UTC
[beam] branch master updated: Merge pull request #15786: Add gap-filling transform for timeseries
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b78a080e932 Merge pull request #15786: Add gap-filling transform for timeseries
b78a080e932 is described below
commit b78a080e93282c83b56a788459090530069033f2
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Jul 13 13:11:34 2022 -0700
Merge pull request #15786: Add gap-filling transform for timeseries
---
.github/autolabeler.yml | 1 +
build.gradle.kts | 1 +
.../org/apache/beam/sdk/coders/SortedMapCoder.java | 197 ++++++++
.../beam/sdk/schemas/transforms/WithKeys.java | 79 +++
sdks/java/extensions/timeseries/build.gradle | 32 ++
.../beam/sdk/extensions/timeseries/FillGaps.java | 535 +++++++++++++++++++++
.../sdk/extensions/timeseries/package-info.java | 20 +
.../sdk/extensions/timeseries/FillGapsTest.java | 355 ++++++++++++++
settings.gradle.kts | 1 +
9 files changed, 1221 insertions(+)
diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml
index 715abeddecf..888c5d4f3a4 100644
--- a/.github/autolabeler.yml
+++ b/.github/autolabeler.yml
@@ -42,6 +42,7 @@ extensions: ["sdks/java/extensions/**/*", "runners/extensions-java/**/*"]
"sketching": ["sdks/java/extensions/sketching/**/*"]
"sorter": ["sdks/java/extensions/sorter/**/*"]
"sql": ["sdks/java/extensions/sql/**/*"]
+"timeseries": ["sdks/java/extensions/timeseries/*"]
"zetasketch": ["sdks/java/extensions/zetasketch/**/*"]
# IO
diff --git a/build.gradle.kts b/build.gradle.kts
index aa5f8949fa5..7ea18895f77 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -195,6 +195,7 @@ tasks.register("javaPreCommitPortabilityApi") {
tasks.register("javaPostCommit") {
dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
+ dependsOn(":sdks:java:extensions:timeseries:postCommit")
dependsOn(":sdks:java:extensions:zetasketch:postCommit")
dependsOn(":sdks:java:extensions:ml:postCommit")
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
new file mode 100644
index 00000000000..8448c915654
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+
+/**
+ * A {@link Coder} for {@link SortedMap Maps} that encodes them according to provided coders for
+ * keys and values.
+ *
+ * @param <K> the type of the keys of the KVs being transcoded
+ * @param <V> the type of the values of the KVs being transcoded
+ */
+public class SortedMapCoder<K extends Comparable<? super K>, V>
+ extends StructuredCoder<SortedMap<K, V>> {
+ /** Produces a MapCoder with the given keyCoder and valueCoder. */
+ public static <K extends Comparable<? super K>, V> SortedMapCoder<K, V> of(
+ Coder<K> keyCoder, Coder<V> valueCoder) {
+ return new SortedMapCoder<>(keyCoder, valueCoder);
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ public Coder<V> getValueCoder() {
+ return valueCoder;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private Coder<K> keyCoder;
+ private Coder<V> valueCoder;
+
+ private SortedMapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public void encode(SortedMap<K, V> map, OutputStream outStream)
+ throws IOException, CoderException {
+ encode(map, outStream, Context.NESTED);
+ }
+
+ @Override
+ public void encode(SortedMap<K, V> map, OutputStream outStream, Context context)
+ throws IOException, CoderException {
+ if (map == null) {
+ throw new CoderException("cannot encode a null SortedMap");
+ }
+ DataOutputStream dataOutStream = new DataOutputStream(outStream);
+
+ int size = map.size();
+ dataOutStream.writeInt(size);
+ if (size == 0) {
+ return;
+ }
+
+ // Since we handled size == 0 above, entry is guaranteed to exist before and after loop
+ Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
+ Entry<K, V> entry = iterator.next();
+ while (iterator.hasNext()) {
+ keyCoder.encode(entry.getKey(), outStream);
+ valueCoder.encode(entry.getValue(), outStream);
+ entry = iterator.next();
+ }
+
+ keyCoder.encode(entry.getKey(), outStream);
+ valueCoder.encode(entry.getValue(), outStream, context);
+ // no flush needed as DataOutputStream does not buffer
+ }
+
+ @Override
+ public SortedMap<K, V> decode(InputStream inStream) throws IOException, CoderException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
+ public SortedMap<K, V> decode(InputStream inStream, Context context)
+ throws IOException, CoderException {
+ DataInputStream dataInStream = new DataInputStream(inStream);
+ int size = dataInStream.readInt();
+ if (size == 0) {
+ return Collections.emptySortedMap();
+ }
+
+ SortedMap<K, V> retval = Maps.newTreeMap();
+ for (int i = 0; i < size - 1; ++i) {
+ K key = keyCoder.decode(inStream);
+ V value = valueCoder.decode(inStream);
+ retval.put(key, value);
+ }
+
+ K key = keyCoder.decode(inStream);
+ V value = valueCoder.decode(inStream, context);
+ retval.put(key, value);
+ return retval;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1.
+ */
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder, valueCoder);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws NonDeterministicException always. Not all maps have a deterministic encoding. For
+ * example, {@code HashMap} comparison does not depend on element order, so two {@code
+ * HashMap} instances may be equal but produce different encodings.
+ */
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(
+ this, "Ordering of entries in a Map may be non-deterministic.");
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals();
+ }
+
+ @Override
+ public Object structuralValue(SortedMap<K, V> value) {
+ if (consistentWithEquals()) {
+ return value;
+ } else {
+ Map<Object, Object> ret = Maps.newHashMapWithExpectedSize(value.size());
+ for (Map.Entry<K, V> entry : value.entrySet()) {
+ ret.put(
+ keyCoder.structuralValue(entry.getKey()), valueCoder.structuralValue(entry.getValue()));
+ }
+ return ret;
+ }
+ }
+
+ @Override
+ public void registerByteSizeObserver(SortedMap<K, V> map, ElementByteSizeObserver observer)
+ throws Exception {
+ observer.update(4L);
+ if (map.isEmpty()) {
+ return;
+ }
+ Iterator<Entry<K, V>> entries = map.entrySet().iterator();
+ Entry<K, V> entry = entries.next();
+ while (entries.hasNext()) {
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+ entry = entries.next();
+ }
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+ }
+
+ @Override
+ public TypeDescriptor<SortedMap<K, V>> getEncodedTypeDescriptor() {
+ return new TypeDescriptor<SortedMap<K, V>>() {}.where(
+ new TypeParameter<K>() {}, keyCoder.getEncodedTypeDescriptor())
+ .where(new TypeParameter<V>() {}, valueCoder.getEncodedTypeDescriptor());
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java
new file mode 100644
index 00000000000..d057b75c677
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class WithKeys<T> extends PTransform<PCollection<T>, PCollection<KV<Row, T>>> {
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+
+ public static <T> WithKeys<T> of(FieldAccessDescriptor fieldAccessDescriptor) {
+ return new WithKeys<>(fieldAccessDescriptor);
+ }
+
+ private WithKeys(FieldAccessDescriptor fieldAccessDescriptor) {
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ }
+
+ @Override
+ public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+ Schema schema = input.getSchema();
+ TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+ if (typeDescriptor == null) {
+ throw new RuntimeException("Null type descriptor on input.");
+ }
+ SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+ SerializableFunction<Row, T> fromRowFunction = input.getFromRowFunction();
+
+ FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+ RowSelector rowSelector = new RowSelectorContainer(schema, resolved, true);
+ Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+ return input
+ .apply(
+ "selectKeys",
+ ParDo.of(
+ new DoFn<T, KV<Row, T>>() {
+ @ProcessElement
+ public void process(
+ @Element Row row, // Beam will convert the element to a row.
+ @Element T element, // Beam will return the original element.
+ OutputReceiver<KV<Row, T>> o) {
+ o.output(KV.of(rowSelector.select(row), element));
+ }
+ }))
+ .setCoder(
+ KvCoder.of(
+ SchemaCoder.of(keySchema),
+ SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction)));
+ }
+}
diff --git a/sdks/java/extensions/timeseries/build.gradle b/sdks/java/extensions/timeseries/build.gradle
new file mode 100644
index 00000000000..fd8b961b0ad
--- /dev/null
+++ b/sdks/java/extensions/timeseries/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.extensions.timeseries'
+)
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries"
+
+dependencies {
+ implementation library.java.vendored_guava_26_0_jre
+ implementation library.java.joda_time
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ testImplementation library.java.junit
+ testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
new file mode 100644
index 00000000000..1e33f292b83
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.SortedMapCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.transforms.WithKeys;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Fill gaps in timeseries. Values are expected to have Beam schemas registered.
+ *
+ * <p>This transform views the original PCollection as a collection of timeseries, each with a different key. They
+ * key to be used and the timeseries bucket size are both specified in the {@link #of} creation method. Multiple
+ * fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the
+ * original {@link PCollection} will appear unchanged in the output PCollection, with timestamp and window unchanged.
+ * Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element
+ * (by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is
+ * the end of the bucket, and the original PCollection's window function is used to assign it to a window.
+ *
+ *
+ * <p>Example usage: the following code views each user,country pair in the input {@link PCollection} as a timeseries
+ * with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from
+ * the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there
+ * are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in
+ * {@link #withMaxGapFillBuckets}.
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ * input.apply("fillGaps",
+ * FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ * .withMaxGapFillBuckets(3600L)));
+ * gapFilled.apply(MySink.create());
+ * }</pre>
+ *
+ * <p>By default, the latest element from the previous bucket is propagated into missing buckets. The user can override
+ * this using the {@link #withMergeFunction} method. Several built-in merge functions are provided for -
+ * {@link #keepLatest()} (the default), {@link #keepEarliest()}, an {@link #keepNull()}.
+ *
+ * <p>Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the
+ * following element type containing a timestamp:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * class MyType {
+ * MyData data;
+ * Instant timestamp;
+ * @SchemaCreate
+ * MyType(MyData data, Instant timestamp) {
+ * this.data = data;
+ * this.timestamp - timestamp;
+ * }
+ * })</pre>
+ *
+ * The element timestamps should always be contained in its current timeseries bucket, so the element needs to be
+ * modified when propagated to a new bucket. This can be done using the {@link #withInterpolateFunction}} method, as
+ * follows:
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ * input.apply("fillGaps",
+ * FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ * .withInterpolateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp()))
+ * .withMaxGapFillBuckets(360L)));
+ * gapFilled.apply(MySink.create());
+ * }</pre>
+ */
+@AutoValue
+public abstract class FillGaps<ValueT>
+ extends PTransform<PCollection<ValueT>, PCollection<ValueT>> {
+ // We garbage collect every 60 windows by default.
+ private static final int GC_EVERY_N_BUCKETS = 60;
+
+ /**
+ * Argument to {@link #withMergeFunction}. Always propagates the element with the latest
+ * timestamp.
+ */
+ public static <ValueT>
+ SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ keepLatest() {
+ return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v1 : v2;
+ }
+
+ /**
+ * Argument to {@link #withMergeFunction}. Always propagates the element with the earliest
+ * timestamp.
+ */
+ public static <ValueT>
+ SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ keepEarliest() {
+ return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v2 : v1;
+ }
+
+ /** Argument to withInterpolateFunction function. */
+ @AutoValue
+ public abstract static class InterpolateData<ValueT> {
+ public abstract TimestampedValue<ValueT> getValue();
+
+ public abstract BoundedWindow getPreviousWindow();
+
+ public abstract BoundedWindow getNextWindow();
+ }
+
+ abstract Duration getTimeseriesBucketDuration();
+
+ abstract Long getMaxGapFillBuckets();
+
+ abstract Instant getStopTime();
+
+ abstract FieldAccessDescriptor getKeyDescriptor();
+
+ abstract SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ getMergeValues();
+
+ abstract int getGcEveryNBuckets();
+
+ @Nullable
+ abstract SerializableFunction<InterpolateData<ValueT>, ValueT> getInterpolateFunction();
+
+ abstract Builder<ValueT> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<ValueT> {
+ abstract Builder<ValueT> setTimeseriesBucketDuration(Duration value);
+
+ abstract Builder<ValueT> setMaxGapFillBuckets(Long value);
+
+ abstract Builder<ValueT> setStopTime(Instant value);
+
+ abstract Builder<ValueT> setKeyDescriptor(FieldAccessDescriptor keyDescriptor);
+
+ abstract Builder<ValueT> setMergeValues(
+ SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ mergeValues);
+
+ abstract Builder<ValueT> setInterpolateFunction(
+ @Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction);
+
+ abstract Builder<ValueT> setGcEveryNBuckets(int gcEveryNBuckets);
+
+ abstract FillGaps<ValueT> build();
+ }
+
+ /** Construct the transform for the given duration and key fields. */
+ public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, String... keys) {
+ return of(windowDuration, FieldAccessDescriptor.withFieldNames(keys));
+ }
+
+ /** Construct the transform for the given duration and key fields. */
+ public static <ValueT> FillGaps<ValueT> of(
+ Duration windowDuration, FieldAccessDescriptor keyDescriptor) {
+ return new AutoValue_FillGaps.Builder<ValueT>()
+ .setTimeseriesBucketDuration(windowDuration)
+ .setMaxGapFillBuckets(Long.MAX_VALUE)
+ .setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE)
+ .setKeyDescriptor(keyDescriptor)
+ .setMergeValues(keepLatest())
+ .setGcEveryNBuckets(GC_EVERY_N_BUCKETS)
+ .build();
+ }
+
+ /* The max gap duration that will be filled. The transform will stop filling timeseries buckets after this duration. */
+ FillGaps<ValueT> withMaxGapFillBuckets(Long value) {
+ return toBuilder().setMaxGapFillBuckets(value).build();
+ }
+
+ /* A hard (event-time) stop time for the transform. */
+ FillGaps<ValueT> withStopTime(Instant stopTime) {
+ return toBuilder().setStopTime(stopTime).build();
+ }
+
+ /**
+ * If there are multiple values in a single timeseries bucket, this function is used to specify
+ * what to propagate to the next bucket. If not specified, then the value with the latest
+ * timestamp will be propagated.
+ */
+ FillGaps<ValueT> withMergeFunction(
+ SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ mergeFunction) {
+ return toBuilder().setMergeValues(mergeFunction).build();
+ }
+
+ /**
+ * This function can be used to modify elements before propagating to the next bucket. A common
+ * use case is to modify a contained timestamp to match that of the new bucket.
+ */
+ FillGaps<ValueT> withInterpolateFunction(
+ SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction) {
+ return toBuilder().setInterpolateFunction(interpolateFunction).build();
+ }
+
+ @Override
+ public PCollection<ValueT> expand(PCollection<ValueT> input) {
+ if (!input.hasSchema()) {
+ throw new RuntimeException("The input to FillGaps must have a schema.");
+ }
+
+ FixedWindows bucketWindows = FixedWindows.of(getTimeseriesBucketDuration());
+ // TODO(reuvenlax, BEAM-12795): We need to create KVs to use state/timers. Once BEAM-12795 is
+ // fixed we can dispense with the KVs here.
+ PCollection<KV<Row, ValueT>> keyedValues =
+ input
+ .apply("FixedWindow", Window.into(bucketWindows))
+ .apply("withKeys", WithKeys.of(getKeyDescriptor()));
+
+ WindowFn<ValueT, BoundedWindow> originalWindowFn =
+ (WindowFn<ValueT, BoundedWindow>) input.getWindowingStrategy().getWindowFn();
+ return keyedValues
+ .apply("globalWindow", Window.into(new GlobalWindows()))
+ .apply(
+ "fillGaps",
+ ParDo.of(
+ new FillGapsDoFn<>(
+ bucketWindows,
+ input.getCoder(),
+ getStopTime(),
+ getMaxGapFillBuckets(),
+ getMergeValues(),
+ getInterpolateFunction(),
+ getGcEveryNBuckets())))
+ .apply("applyOriginalWindow", Window.into(originalWindowFn))
+ .setCoder(input.getCoder());
+ }
+
+ public static class FillGapsDoFn<ValueT> extends DoFn<KV<Row, ValueT>, ValueT> {
+ // The window size used.
+ private final FixedWindows bucketWindows;
+ // The garbage-collection window (GC_EVERY_N_BUCKETS * fixedWindows.getSize()).
+ private final FixedWindows gcWindows;
+ // The stop time.
+ private final Instant stopTime;
+ // The max gap-duration to fill. Once the gap fill exceeds this, we will stop filling the gap.
+ private final long maxGapFillBuckets;
+
+ private final SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ mergeValues;
+
+ @Nullable
+ private final SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction;
+
+ // A timer map used to fill potential gaps. Each logical "window" will have a separate timer
+ // which will be cleared if an element arrives in that window. This way the timer will only fire
+ // if there is a gap, at which point it will fill the gap.
+ @TimerFamily("gapTimers")
+ @SuppressWarnings({"UnusedVariable"})
+ private final TimerSpec gapFillingTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ // Timers used to garbage collect state.
+ @TimerFamily("gcTimers")
+ @SuppressWarnings({"UnusedVariable"})
+ private final TimerSpec gcTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ // Keep track of windows already seen. In the future we can replace this with OrderedListState.
+ // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+ @StateId("seenBuckets")
+ @SuppressWarnings({"UnusedVariable"})
+ private final StateSpec<ValueState<SortedMap<Instant, TimestampedValue<ValueT>>>>
+ seenBucketsSpec;
+
+ // For every window, keep track of how long the filled gap is in buckets. If a window was
+ // populated by a received element - i.e.
+ // it's not
+ // a gap fill - then there is no value in this map for that window.
+ // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+ @StateId("gapDurationMap")
+ @SuppressWarnings({"UnusedVariable"})
+ private final StateSpec<ValueState<SortedMap<Instant, Long>>> gapDurationSpec;
+
+ FillGapsDoFn(
+ FixedWindows bucketWindows,
+ Coder<ValueT> valueCoder,
+ Instant stopTime,
+ long maxGapFillBuckets,
+ SerializableBiFunction<
+ TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+ mergeValues,
+ @Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction,
+ int gcEveryNBuckets) {
+ this.bucketWindows = bucketWindows;
+ this.gcWindows = FixedWindows.of(bucketWindows.getSize().multipliedBy(gcEveryNBuckets));
+ this.stopTime = stopTime;
+ this.maxGapFillBuckets = maxGapFillBuckets;
+ this.seenBucketsSpec =
+ StateSpecs.value(
+ SortedMapCoder.of(InstantCoder.of(), TimestampedValueCoder.of(valueCoder)));
+ this.gapDurationSpec =
+ StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), VarLongCoder.of()));
+ this.mergeValues = mergeValues;
+ this.interpolateFunction = interpolateFunction;
+ }
+
+ @ProcessElement
+ public void process(
+ @Element KV<Row, ValueT> element,
+ @Timestamp Instant ts,
+ @TimerFamily("gapTimers") TimerMap gapTimers,
+ @TimerFamily("gcTimers") TimerMap gcTimers,
+ @AlwaysFetched @StateId("seenBuckets")
+ ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+ OutputReceiver<ValueT> output) {
+ if (ts.isAfter(stopTime)) {
+ return;
+ }
+
+ Instant windowEndTs = bucketWindows.assignWindow(ts).end();
+ if (processEvent(
+ () -> TimestampedValue.of(element.getValue(), ts),
+ windowEndTs,
+ gapTimers,
+ gcTimers,
+ seenBuckets,
+ -1,
+ output)) {
+ // We've seen data for this window, so clear any gap-filling timer.
+ gapTimers.get(windowToTimerTag(windowEndTs)).clear();
+ }
+ }
+
+ private String windowToTimerTag(Instant endTs) {
+ return Long.toString(endTs.getMillis());
+ }
+
+ private Instant windowFromTimerTag(String key) {
+ return Instant.ofEpochMilli(Long.parseLong(key));
+ }
+
+ @OnTimerFamily("gapTimers")
+ public void onTimer(
+ @TimerId String timerId,
+ @Timestamp Instant timestamp,
+ @TimerFamily("gapTimers") TimerMap gapTimers,
+ @TimerFamily("gcTimers") TimerMap gcTimers,
+ @AlwaysFetched @StateId("seenBuckets")
+ ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+ @AlwaysFetched @StateId("gapDurationMap") ValueState<SortedMap<Instant, Long>> gapDurations,
+ OutputReceiver<ValueT> output) {
+ Instant bucketEndTs = windowFromTimerTag(timerId);
+ Instant bucketMaxTs = bucketEndTs.minus(Duration.millis(1));
+ Instant previousBucketEndTs = bucketEndTs.minus(bucketWindows.getSize());
+ Instant previousBucketMaxTs = previousBucketEndTs.minus(Duration.millis(1));
+
+ Map<Instant, TimestampedValue<ValueT>> seenBucketMap = seenBuckets.read();
+ if (seenBucketMap == null) {
+ throw new RuntimeException("Unexpected timer fired with no seenBucketMap.");
+ }
+
+ @Nullable SortedMap<Instant, Long> gapDurationsMap = gapDurations.read();
+ long gapSize = 0;
+ if (gapDurationsMap != null) {
+ gapSize = gapDurationsMap.getOrDefault(previousBucketEndTs, 0L);
+ }
+ // If the timer fires and we've never seen an element for this window then we reach into the
+ // previous
+ // window and copy its value into this window. This relies on the fact that timers fire in
+ // order
+ // for a given key, so if there are multiple gap windows then the previous window will be
+ // filled
+ // by the time we get here.
+ // processEvent will also set a timer for the next window if we haven't already seen an
+ // element
+ // for that window.
+ processEvent(
+ () -> {
+ TimestampedValue<ValueT> previous = seenBucketMap.get(previousBucketEndTs);
+ if (previous == null) {
+ throw new RuntimeException(
+ "Processing bucket for "
+ + bucketEndTs
+ + " before processing bucket "
+ + "for "
+ + previousBucketEndTs);
+ }
+ ValueT value = previous.getValue();
+ if (interpolateFunction != null) {
+ BoundedWindow previousBucket = bucketWindows.assignWindow(previousBucketMaxTs);
+ BoundedWindow currentBucket = bucketWindows.assignWindow(bucketMaxTs);
+ Preconditions.checkState(!currentBucket.equals(previousBucket));
+ value =
+ interpolateFunction.apply(
+ new AutoValue_FillGaps_InterpolateData<>(
+ previous, previousBucket, currentBucket));
+ }
+ return TimestampedValue.of(value, bucketMaxTs);
+ },
+ bucketEndTs,
+ gapTimers,
+ gcTimers,
+ seenBuckets,
+ gapSize,
+ output);
+ if (!seenBucketMap.containsKey(bucketEndTs.plus(bucketWindows.getSize()))) {
+ // The next bucket is still empty, so update gapDurations
+ if (gapDurationsMap == null) {
+ gapDurationsMap = Maps.newTreeMap();
+ }
+ gapDurationsMap.put(bucketEndTs, gapSize + 1);
+ gapDurations.write(gapDurationsMap);
+ }
+ }
+
+ @OnTimerFamily("gcTimers")
+ public void onGcTimer(
+ @Timestamp Instant timerTs,
+ @AlwaysFetched @StateId("seenBuckets")
+ ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+ @AlwaysFetched @StateId("gapDurationMap")
+ ValueState<SortedMap<Instant, Long>> gapDurations) {
+ gcMap(seenBuckets, timerTs.minus(gcWindows.getSize()));
+ gcMap(gapDurations, timerTs.minus(gcWindows.getSize()));
+ }
+
+ // returns true if this is the first event for the bucket.
+ private boolean processEvent(
+ Supplier<TimestampedValue<ValueT>> getValue,
+ Instant bucketEndTs,
+ TimerMap gapTimers,
+ TimerMap gcTimers,
+ ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+ long gapSize,
+ OutputReceiver<ValueT> output) {
+ TimestampedValue<ValueT> value = getValue.get();
+ output.outputWithTimestamp(value.getValue(), value.getTimestamp());
+
+ boolean firstElementInBucket = true;
+ TimestampedValue<ValueT> valueToWrite = value;
+ SortedMap<Instant, TimestampedValue<ValueT>> seenBucketsMap = seenBuckets.read();
+ if (seenBucketsMap == null) {
+ seenBucketsMap = Maps.newTreeMap();
+ } else {
+ @Nullable TimestampedValue<ValueT> existing = seenBucketsMap.get(bucketEndTs);
+ if (existing != null) {
+ valueToWrite = mergeValues.apply(existing, value);
+ // No need to set a timer as we've already seen an element for this window before.
+ firstElementInBucket = false;
+ }
+ }
+
+ // Update the seenWindows state variable.
+ seenBucketsMap.put(bucketEndTs, valueToWrite);
+ seenBuckets.write(seenBucketsMap);
+
+ if (firstElementInBucket) {
+ // Potentially set a timer for the next window.
+
+ // Here we calculate how long the gap-extension duration (the total size of all gaps filled
+ // since
+ // the last element seen) would be for the next window. If this would exceed the max-gap
+ // duration
+ // set by the user, then stop.
+ Instant nextBucketEndTs = bucketEndTs.plus(bucketWindows.getSize());
+ Instant nextBucketMaxTs = nextBucketEndTs.minus(Duration.millis(1));
+ // Set a gap-filling timer for the next window if we haven't yet seen that window.
+ if (nextBucketMaxTs.isBefore(stopTime)
+ && gapSize + 1 < maxGapFillBuckets
+ && !seenBucketsMap.containsKey(nextBucketEndTs)) {
+ gapTimers
+ .get(windowToTimerTag(nextBucketEndTs))
+ .withOutputTimestamp(bucketEndTs)
+ .set(nextBucketEndTs);
+ }
+
+ // Set a gcTimer
+ Instant gcTs = gcWindows.assignWindow(nextBucketEndTs).end();
+ gcTimers.get(windowToTimerTag(gcTs)).set(gcTs);
+ }
+ return firstElementInBucket;
+ }
+
+ private static <V> void gcMap(ValueState<SortedMap<Instant, V>> mapState, Instant ts) {
+ SortedMap<Instant, V> map = mapState.read();
+ if (map != null) {
+ // Clear all map elements that are for windows strictly less than timerTs.
+ map.headMap(ts).clear();
+ if (map.isEmpty()) {
+ mapState.clear();
+ } else {
+ mapState.write(map);
+ }
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java
new file mode 100644
index 00000000000..305e0db0d0f
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Utilities for operating on timeseries data. */
+package org.apache.beam.sdk.extensions.timeseries;
diff --git a/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java
new file mode 100644
index 00000000000..da419ec1820
--- /dev/null
+++ b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesLoopingTimer;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Reify;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class FillGapsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ abstract static class Message {
+ abstract String getKey();
+
+ abstract String getValue();
+
+ abstract Instant getTimestamp();
+
+ static Message update(FillGaps.InterpolateData<Message> interpolateData) {
+ Message value = interpolateData.getValue().getValue();
+ Instant nextWindowMax = interpolateData.getNextWindow().maxTimestamp();
+ return value.toBuilder().setTimestamp(nextWindowMax).build();
+ }
+
+ static Message of(String key, String value, Instant timestamp) {
+ return new AutoValue_FillGapsTest_Message.Builder()
+ .setKey(key)
+ .setValue(value)
+ .setTimestamp(timestamp)
+ .build();
+ }
+
+ static TimestampedValue<Message> ofTimestamped(String key, String value, Instant timestamp) {
+ return TimestampedValue.of(of(key, value, timestamp), timestamp);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setKey(String key);
+
+ abstract Builder setValue(String value);
+
+ abstract Builder setTimestamp(Instant timestamp);
+
+ abstract Message build();
+ }
+
+ abstract Builder toBuilder();
+ }
+
+ @Test
+ public void testFillGaps() {
+ List<TimestampedValue<Message>> values =
+ ImmutableList.of(
+ Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+ Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+ PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+ PCollection<TimestampedValue<Message>> gapFilled =
+ input
+ .apply(
+ FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+ .withStopTime(Instant.ofEpochSecond(5)))
+ .apply(Reify.timestamps());
+
+ FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+ PAssert.that(gapFilled)
+ .containsInAnyOrder(
+ Iterables.concat(
+ values,
+ ImmutableList.of(
+ TimestampedValue.of(
+ Message.of(
+ "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of(
+ "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key1", "value3", Instant.ofEpochSecond(3)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value3", Instant.ofEpochSecond(3)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()))));
+ pipeline.run();
+ }
+
+ @Test
+ public void testFillGapsKeepEarliest() {
+ List<TimestampedValue<Message>> values =
+ ImmutableList.of(
+ Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+ Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+ PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+ PCollection<TimestampedValue<Message>> gapFilled =
+ input
+ .apply(
+ FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+ .withMergeFunction(FillGaps.keepEarliest())
+ .withStopTime(Instant.ofEpochSecond(5)))
+ .apply(Reify.timestamps());
+
+ FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+ PAssert.that(gapFilled)
+ .containsInAnyOrder(
+ Iterables.concat(
+ values,
+ ImmutableList.of(
+ TimestampedValue.of(
+ Message.of("key1", "value1<", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value1<", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key1", "value3", Instant.ofEpochSecond(3)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value3", Instant.ofEpochSecond(3)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()))));
+ pipeline.run();
+ }
+
+ @Test
+ public void testFillGapsMaxDuration() {
+ List<TimestampedValue<Message>> values =
+ ImmutableList.of(
+ Message.ofTimestamped("key1", "value0", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key1", "value1", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped("key2", "value1", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(10)),
+ Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(10)));
+
+ PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+ PCollection<TimestampedValue<Message>> gapFilled =
+ input
+ .apply(
+ FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+ .withMaxGapFillBuckets(4L)
+ .withStopTime(Instant.ofEpochSecond(11)))
+ .apply(Reify.timestamps());
+
+ FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+ PAssert.that(gapFilled)
+ .containsInAnyOrder(
+ Iterables.concat(
+ values,
+ ImmutableList.of(
+ TimestampedValue.of(
+ Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key1", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()),
+ TimestampedValue.of(
+ Message.of("key2", "value1", Instant.ofEpochSecond(1)),
+ fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp()))));
+ pipeline.run();
+ }
+
+ @Test
+ public void testFillGapsPropagateFunction() {
+ List<TimestampedValue<Message>> values =
+ ImmutableList.of(
+ Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)),
+ Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)),
+ Message.ofTimestamped(
+ "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))),
+ Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)),
+ Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3)));
+
+ PCollection<Message> input = pipeline.apply(Create.timestamped(values));
+ PCollection<TimestampedValue<Message>> gapFilled =
+ input
+ .apply(
+ FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+ .withInterpolateFunction(Message::update)
+ .withStopTime(Instant.ofEpochSecond(5)))
+ .apply(Reify.timestamps());
+
+ FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+ Instant bucketTwoMax = fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp();
+ Instant bucketFourMax = fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp();
+
+ PAssert.that(gapFilled)
+ .containsInAnyOrder(
+ Iterables.concat(
+ values,
+ ImmutableList.of(
+ Message.ofTimestamped("key1", "value1", bucketTwoMax),
+ Message.ofTimestamped("key2", "value1", bucketTwoMax),
+ Message.ofTimestamped("key1", "value3", bucketFourMax),
+ Message.ofTimestamped("key2", "value3", bucketFourMax))));
+ pipeline.run();
+ }
+
+ // TODO: This test fails due to DirectRunner bugs. Uncomment once those bugs are fixed.
+ @Test
+ @Category({
+ UsesTimersInParDo.class,
+ UsesLoopingTimer.class,
+ UsesStatefulParDo.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testFillGapsFuzz() {
+ for (int i = 0; i < 6; ++i) {
+ fuzzTest(10, 500, 25, 20);
+ }
+ }
+
+ public void fuzzTest(int numKeys, int numBuckets, int maxGapSizeToGenerate, long maxGapSize) {
+ Pipeline p = Pipeline.create();
+ List<TimestampedValue<Message>> values = Lists.newArrayList();
+ List<TimestampedValue<Message>> expectedGaps = Lists.newArrayList();
+ for (int i = 0; i < numKeys; ++i) {
+ String key = "key" + i;
+ generateFuzzTimerseries(
+ key, numBuckets, maxGapSizeToGenerate, maxGapSize, values, expectedGaps);
+ }
+
+ PCollection<Message> input = p.apply(Create.timestamped(values));
+ PCollection<TimestampedValue<Message>> gapFilled =
+ input
+ .apply(
+ FillGaps.<Message>of(Duration.standardSeconds(1), "key")
+ .withInterpolateFunction(Message::update)
+ .withMaxGapFillBuckets(maxGapSize)
+ .withStopTime(Instant.ofEpochSecond(numBuckets)))
+ .apply(Reify.timestamps());
+
+ PAssert.that(gapFilled).containsInAnyOrder(Iterables.concat(values, expectedGaps));
+ p.run();
+ }
+
+ void generateFuzzTimerseries(
+ String key,
+ int numBuckets,
+ int maxGapSizeToGenerate,
+ long maxGapSize,
+ List<TimestampedValue<Message>> values,
+ List<TimestampedValue<Message>> expectedGaps) {
+ Random random = new Random();
+ String lastValue = null;
+ int currentGapSize = 0;
+ for (int bucket = 0; bucket < numBuckets; ) {
+ if (lastValue != null && currentGapSize < maxGapSizeToGenerate && random.nextInt(10) == 0) {
+ // 10% chance of creating a gap.
+ int gapSize = random.nextInt(maxGapSizeToGenerate) + 1;
+ int lastGapBucket = Math.min(bucket + gapSize, numBuckets);
+
+ for (; bucket < lastGapBucket; ++bucket) {
+ if (currentGapSize < maxGapSize) {
+ addBucketToTimeseries(key, lastValue, bucket, expectedGaps);
+ }
+ ++currentGapSize;
+ }
+ } else {
+ lastValue = "bucket" + bucket;
+ currentGapSize = 0;
+ addBucketToTimeseries(key, lastValue, bucket, values);
+ ++bucket;
+ }
+ }
+ }
+
+ void addBucketToTimeseries(
+ String key, String value, int bucket, List<TimestampedValue<Message>> list) {
+ FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1));
+ BoundedWindow currentBucket = fixedWindows.assignWindow(Instant.ofEpochSecond(bucket));
+ TimestampedValue<Message> message =
+ Message.ofTimestamped(key, value, currentBucket.maxTimestamp());
+ list.add(message);
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7cc83b9698b..357a99c5c65 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -145,6 +145,7 @@ include(":sdks:java:extensions:sql:zetasql")
include(":sdks:java:extensions:sql:expansion-service")
include(":sdks:java:extensions:sql:udf")
include(":sdks:java:extensions:sql:udf-test-provider")
+include(":sdks:java:extensions:timeseries")
include(":sdks:java:extensions:zetasketch")
include(":sdks:java:fn-execution")
include(":sdks:java:harness")