You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/18 06:47:10 UTC

[GitHub] [beam] reuvenlax commented on a change in pull request #15786: Add gap-filling transform for timeseries

reuvenlax commented on a change in pull request #15786:
URL: https://github.com/apache/beam/pull/15786#discussion_r771791795



##########
File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.SortedMapCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.transforms.WithKeys;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Fill gaps in timeseries. Values are expected to have Beam schemas registered.
+ *
+ * <p>This transform views the original PCollection as a collection of timeseries, each with a different key. They
+ * key to be used and the timeseries bucket size are both specified in the {@link #of} creation method. Multiple
+ * fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the
+ * original {@link PCollection} will appear unchanged in the output PCollection, with timestamp and window unchanged.
+ * Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element
+ * (by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is
+ * the end of the bucket, and the original PCollection's window function is used to assign it to a window.
+ *
+ *
+ * <p>Example usage: the following code views each user,country pair in the input {@link PCollection} as a timeseries
+ * with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from
+ * the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there
+ * are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in
+ * {@link #withMaxGapFillBuckets}.
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withMaxGapFillBuckets(3600L)));
+ *  gapFilled.apply(MySink.create());
+ *     }</pre>
+ *
+ * <p>By default, the latest element from the previous bucket is propagated into missing buckets. The user can override
+ * this using the {@link #withMergeFunction} method. Several built-in merge functions are provided for -
+ * {@link #keepLatest()} (the default), {@link #keepEarliest()}, an {@link #keepNull()}.
+ *
+ * <p>Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the
+ * following element type containing a timestamp:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * class MyType {
+ *   MyData data;
+ *   Instant timestamp;
+ *   @SchemaCreate
+ *   MyType(MyData data, Instant timestamp) {
+ *       this.data = data;
+ *       this.timestamp - timestamp;
+ *   }
+ * })</pre>
+ *
+ * The element timestamps should always be contained in its current timeseries bucket, so the element needs to be
+ * modified when propagated to a new bucket. This can be done using the {@link #withPropagateFunction}} method, as
+ * follows:
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withPropagateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp()))
+ *        .withMaxGapFillBuckets(360L)));
+ *  gapFilled.apply(MySink.create());
+ *  }</pre>
+ */
+@AutoValue
+public abstract class FillGaps<ValueT>
+    extends PTransform<PCollection<ValueT>, PCollection<ValueT>> {
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element with the latest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          keepLatest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v1 : v2;
+  }
+
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element with the earliest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          keepEarliest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v2 : v1;
+  }
+
+  /** Argument to {@link #withMergeFunction}. Always propagates a null value. */
+  @SuppressWarnings({"nullness"})
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          keepNull() {
+    return (v1, v2) -> null;
+  }
+
+  /** Argument to withPropagateFunction function. */
+  @AutoValue
+  public abstract static class PropagateData<ValueT> {
+    public abstract TimestampedValue<ValueT> getValue();
+
+    public abstract BoundedWindow getPreviousWindow();
+
+    public abstract BoundedWindow getNextWindow();
+  }
+
+  abstract Duration getTimeseriesBucketDuration();
+
+  abstract Long getMaxGapFillBuckets();
+
+  abstract Instant getStopTime();
+
+  abstract FieldAccessDescriptor getKeyDescriptor();
+
+  abstract SerializableBiFunction<
+          TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+      getMergeValues();
+
+  @Nullable
+  abstract SerializableFunction<PropagateData<ValueT>, ValueT> getPropagateFunction();
+
+  abstract Builder<ValueT> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<ValueT> {
+    abstract Builder<ValueT> setTimeseriesBucketDuration(Duration value);
+
+    abstract Builder<ValueT> setMaxGapFillBuckets(Long value);
+
+    abstract Builder<ValueT> setStopTime(Instant value);
+
+    abstract Builder<ValueT> setKeyDescriptor(FieldAccessDescriptor keyDescriptor);
+
+    abstract Builder<ValueT> setMergeValues(
+        SerializableBiFunction<
+                TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+            mergeValues);
+
+    abstract Builder<ValueT> setPropagateFunction(
+        @Nullable SerializableFunction<PropagateData<ValueT>, ValueT> propagateFunction);
+
+    abstract FillGaps<ValueT> build();
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, String... keys) {
+    return of(windowDuration, FieldAccessDescriptor.withFieldNames(keys));
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(
+      Duration windowDuration, FieldAccessDescriptor keyDescriptor) {
+    return new AutoValue_FillGaps.Builder<ValueT>()
+        .setTimeseriesBucketDuration(windowDuration)
+        .setMaxGapFillBuckets(Long.MAX_VALUE)
+        .setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE)
+        .setKeyDescriptor(keyDescriptor)
+        .setMergeValues(keepLatest())
+        .build();
+  }
+
+  /* The max gap duration that will be filled. The transform will stop filling timeseries buckets after this duration. */
+  FillGaps<ValueT> withMaxGapFillBuckets(Long value) {
+    return toBuilder().setMaxGapFillBuckets(value).build();
+  }
+
+  /* A hard (event-time) stop time for the transform. */
+  FillGaps<ValueT> withStopTime(Instant stopTime) {
+    return toBuilder().setStopTime(stopTime).build();
+  }
+
+  /**
+   * If there are multiple values in a single timeseries bucket, this function is used to specify
+   * what to propagate to the next bucket. If not specified, then the value with the latest
+   * timestamp will be propagated.
+   */
+  FillGaps<ValueT> withMergeFunction(
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+          mergeFunction) {
+    return toBuilder().setMergeValues(mergeFunction).build();
+  }
+
+  /**
+   * This function can be used to modify elements before propagating to the next bucket. A common
+   * use case is to modify a contained timestamp to match that of the new bucket.
+   */
+  FillGaps<ValueT> withPropagateFunction(
+      SerializableFunction<PropagateData<ValueT>, ValueT> propagateFunction) {
+    return toBuilder().setPropagateFunction(propagateFunction).build();
+  }
+
+  @Override
+  public PCollection<ValueT> expand(PCollection<ValueT> input) {
+    if (!input.hasSchema()) {
+      throw new RuntimeException("The input to FillGaps must have a schema.");
+    }
+
+    FixedWindows bucketWindows = FixedWindows.of(getTimeseriesBucketDuration());
+    // TODO(reuvenlax, BEAM-12795): We need to create KVs to use state/timers. Once BEAM-12795 is
+    // fixed we can dispense with the KVs here.
+    PCollection<KV<Row, ValueT>> keyedValues =
+        input
+            .apply("FixedWindow", Window.into(bucketWindows))
+            .apply("withKeys", WithKeys.of(getKeyDescriptor()));
+
+    WindowFn<ValueT, BoundedWindow> originalWindowFn =
+        (WindowFn<ValueT, BoundedWindow>) input.getWindowingStrategy().getWindowFn();
+    return keyedValues
+        .apply("globalWindow", Window.into(new GlobalWindows()))
+        .apply(
+            "fillGaps",
+            ParDo.of(
+                new FillGapsDoFn<>(
+                    bucketWindows,
+                    input.getCoder(),
+                    getStopTime(),
+                    getMaxGapFillBuckets(),
+                    getMergeValues(),
+                    getPropagateFunction())))
+        .apply("applyOriginalWindow", Window.into(originalWindowFn))
+        .setCoder(input.getCoder());
+  }
+
+  public static class FillGapsDoFn<ValueT> extends DoFn<KV<Row, ValueT>, ValueT> {
+    // We garbage collect every 60 windows by default.
+    private static final int GC_EVERY_N_BUCKETS = 60;
+    // The window size used.
+    private final FixedWindows bucketWindows;
+    // The garbage-collection window (GC_EVERY_N_BUCKETS * fixedWindows.getSize()).
+    private final FixedWindows gcWindows;
+    // The stop time.
+    private final Instant stopTime;
+    // The max gap-duration to fill. Once the gap fill exceeds this, we will stop filling the gap.
+    private final long maxGapFillBuckets;
+
+    private final SerializableBiFunction<
+            TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+        mergeValues;
+
+    @Nullable private final SerializableFunction<PropagateData<ValueT>, ValueT> propagateFunction;
+
+    // A timer map used to fill potential gaps. Each logical "window" will have a separate timer
+    // which will be cleared if an element arrives in that window. This way the timer will only fire
+    // if there is a gap, at which point it will fill the gap.
+    @TimerFamily("gapTimers")
+    @SuppressWarnings({"UnusedVariable"})
+    private final TimerSpec gapFillingTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Timers used to garbage collect state.
+    @TimerFamily("gcTimers")
+    @SuppressWarnings({"UnusedVariable"})
+    private final TimerSpec gcTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Keep track of windows already seen. In the future we can replace this with OrderedListState.
+    // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+    @StateId("seenBuckets")
+    @SuppressWarnings({"UnusedVariable"})
+    private final StateSpec<ValueState<SortedMap<Instant, TimestampedValue<ValueT>>>>
+        seenBucketsSpec;
+
+    // For every window, keep track of how long the filled gap is in buckets. If a window was
+    // populated by a received element - i.e.
+    // it's not
+    // a gap fill - then there is no value in this map for that window.
+    // Keyed by window end timestamp (which is 1ms greater than the window max timestamp).
+    @StateId("gapDurationMap")
+    @SuppressWarnings({"UnusedVariable"})
+    private final StateSpec<ValueState<SortedMap<Instant, Long>>> gapDurationSpec;
+
+    FillGapsDoFn(
+        FixedWindows bucketWindows,
+        Coder<ValueT> valueCoder,
+        Instant stopTime,
+        long maxGapFillBuckets,
+        SerializableBiFunction<
+                TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>>
+            mergeValues,
+        @Nullable SerializableFunction<PropagateData<ValueT>, ValueT> propagateFunction) {
+      this.bucketWindows = bucketWindows;
+      this.gcWindows = FixedWindows.of(bucketWindows.getSize().multipliedBy(GC_EVERY_N_BUCKETS));
+      this.stopTime = stopTime;
+      this.maxGapFillBuckets = maxGapFillBuckets;
+      this.seenBucketsSpec =
+          StateSpecs.value(
+              SortedMapCoder.of(InstantCoder.of(), TimestampedValueCoder.of(valueCoder)));
+      this.gapDurationSpec =
+          StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), VarLongCoder.of()));
+      this.mergeValues = mergeValues;
+      this.propagateFunction = propagateFunction;
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Row, ValueT> element,
+        @Timestamp Instant ts,
+        @TimerFamily("gapTimers") TimerMap gapTimers,
+        @TimerFamily("gcTimers") TimerMap gcTimers,
+        @AlwaysFetched @StateId("seenBuckets")
+            ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> seenBuckets,
+        OutputReceiver<ValueT> output) {
+      if (ts.isAfter(stopTime)) {

Review comment:
       What does "absolute" add? stopTime seems clear enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org