You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damccorm (via GitHub)" <gi...@apache.org> on 2024/02/02 14:26:01 UTC

Re: [PR] [RRIO] [Throttle] transform that slows down element transmission without an external resource [beam]

damccorm commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1476125284


##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java:
##########
@@ -17,41 +17,651 @@
  */
 package org.apache.beam.io.requestresponse;
 
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.SplittableRandom;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
 
 /**
  * {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link PCollection} emitting
  * a {@link RequestT} {@link PCollection} at a maximally configured rate, without using an external
  * resource.
  */
-// TODO(damondouglas): expand what "without external resource" means with respect to "with external
-//   resource" when the other throttle transforms implemented.
-//   See: https://github.com/apache/beam/issues/28932
 class ThrottleWithoutExternalResource<RequestT>
-    extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
+    extends PTransform<PCollection<RequestT>, Result<RequestT>> {
+
+  static final String DISTRIBUTION_METRIC_NAME = "milliseconds_between_element_emissions";
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  private final TupleTag<RequestT> outputTag = new TupleTag<RequestT>() {};
+  private final TupleTag<ApiIOError> errorTag = new TupleTag<ApiIOError>() {};
+
+  /**
+   * Instantiates a {@link ThrottleWithoutExternalResource} with the maximumRate of {@link Rate} and
+   * without collecting metrics.
+   */
+  static <RequestT> ThrottleWithoutExternalResource<RequestT> of(Rate maximumRate) {
+    return new ThrottleWithoutExternalResource<>(
+        Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link ThrottleWithoutExternalResource} with metrics collection turned on. */
+  ThrottleWithoutExternalResource<RequestT> withMetricsCollected() {
+    return new ThrottleWithoutExternalResource<>(
+        configuration.toBuilder().setCollectMetrics(true).build());
+  }
 
-  // TODO(damondouglas): remove suppress warnings when finally utilized in a future PR.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
+  private final Configuration configuration;
 
-  private ThrottleWithoutExternalResource(Configuration<RequestT> configuration) {
+  private ThrottleWithoutExternalResource(Configuration configuration) {
     this.configuration = configuration;
   }
 
   @Override
-  public PCollection<RequestT> expand(PCollection<RequestT> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return input;
+  public Result<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), listCoder);
+
+    PCollectionTuple pct =
+        input
+            // Break up the PCollection into fixed channels assigned to an int key [0,
+            // Rate::numElements).
+            .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+            // Apply GlobalWindows to prevent multiple window assignment.
+            .apply(
+                GlobalWindows.class.getSimpleName(),
+                Window.<KV<Integer, RequestT>>into(new GlobalWindows())
+                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                    .discardingFiredPanes())
+            // Apply GroupByKey to convert PCollection of KV<Integer, RequestT> to KV<Integer,
+            // Iterable<RequestT>>.
+            .apply(GroupByKey.class.getSimpleName(), GroupByKey.create())
+            // Convert KV<Integer, Iterable<RequestT>> to KV<Integer, List<RequestT>> for cleaner
+            // processing by ThrottleFn; IterableCoder uses a List for IterableLikeCoder's
+            // structuralValue.
+            .apply("ConvertToList", toList())
+            .setCoder(kvCoder)
+            // Finally apply a splittable DoFn by splitting the Iterable<RequestT>, controlling the
+            // output via the watermark estimator.
+            .apply(ThrottleFn.class.getSimpleName(), throttle());
+
+    Result<RequestT> result = Result.of(input.getCoder(), outputTag, errorTag, pct);
+
+    // If configured to collect metrics, assign a single key to the global window timestamp and
+    // apply ComputeMetricsFn.
+    if (configuration.getCollectMetrics()) {
+      result
+          .getResponses()
+          .apply(
+              BoundedWindow.class.getSimpleName(),
+              WithKeys.of(ignored -> BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()))
+          .setCoder(KvCoder.of(VarLongCoder.of(), input.getCoder()))
+          .apply(ComputeMetricsFn.class.getSimpleName(), computeMetrics())
+          .setCoder(input.getCoder());
+    }
+
+    return result;
+  }
+
+  private ParDo.MultiOutput<KV<Integer, List<RequestT>>, RequestT> throttle() {
+    return ParDo.of(new ThrottleFn<RequestT>(configuration, outputTag))
+        .withOutputTags(outputTag, TupleTagList.of(errorTag));
+  }
+
+  /**
+   * This {@link DoFn} is inspired by {@link org.apache.beam.sdk.transforms.PeriodicSequence}'s DoFn
+   * implementation with the exception that instead of emitting an {@link Instant}, it emits a
+   * {@link RequestT}. Additionally, it uses an Integer based {@link OffsetRange} and its associated
+   * {@link OffsetRangeTracker}. The reason for using an Integer based offset range is due to Java
+   * collection sizes limit to int instead of long. Splittable DoFns provide access to hold the
+   * watermark, and along with an output with timestamp, allow the DoFn to emit elements as
+   * prescribed intervals.
+   */
+  static class ThrottleFn<RequestT> extends DoFn<KV<Integer, List<RequestT>>, RequestT> {
+
+    private final Configuration configuration;
+    private final TupleTag<RequestT> outputTag;
+    private @MonotonicNonNull Counter inputElementsCounter = null;
+    private @MonotonicNonNull Counter outputElementsCounter = null;
+
+    ThrottleFn(Configuration configuration, TupleTag<RequestT> outputTag) {
+      this.configuration = configuration;
+      this.outputTag = outputTag;
+    }
+
+    @Setup
+    public void setup() {
+      if (configuration.getCollectMetrics()) {
+        inputElementsCounter = Metrics.counter(ThrottleFn.class, INPUT_ELEMENTS_COUNTER_NAME);
+        outputElementsCounter = Metrics.counter(ThrottleFn.class, OUTPUT_ELEMENTS_COUNTER_NAME);
+      }
+    }
+
+    /**
+     * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED} {@link OffsetRange}
+     * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for null {@link
+     * KV#getValue()} elements.
+     */
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>> element) {
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+      return OffsetRange.ofSize(size);
+    }
+
+    /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange} instance. */
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Integer> newTracker(
+        @Restriction OffsetRange restriction) {
+      return new OffsetRangeTracker(restriction);
+    }
+
+    /** Simply returns the {@link OffsetRange} restriction. */
+    @TruncateRestriction
+    public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+        @Restriction OffsetRange restriction) {
+      return new RestrictionTracker.TruncateResult<OffsetRange>() {
+        @Override
+        public ThrottleWithoutExternalResource.@Nullable OffsetRange getTruncatedRestriction() {
+          return restriction;
+        }
+      };
+    }
+
+    /**
+     * The {@link GetInitialWatermarkEstimatorState} initializes to this DoFn's output watermark to
+     * a negative infinity timestamp via {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+     * Instant} returned by this method provides the runner the value is passes as an argument to
+     * this DoFn's {@link #newWatermarkEstimator}.
+     */
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link NewWatermarkEstimator},
+     * instantiated from an {@link Instant}. The state argument in this method comes from the return
+     * of the {@link #getInitialWatermarkState}.
+     */
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+      return new WatermarkEstimators.Manual(state);
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Integer, List<RequestT>> element,
+        ManualWatermarkEstimator<Instant> estimator,
+        RestrictionTracker<OffsetRange, Integer> tracker,
+        MultiOutputReceiver receiver) {
+
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+
+      incIfPresent(inputElementsCounter, size);
+
+      if (element.getValue() == null || element.getValue().isEmpty()) {
+        return;
+      }
+
+      while (tracker.tryClaim(tracker.currentRestriction().getCurrent() + 1)) {
+        Instant nextEmittedTimestamp =
+            estimator.currentWatermark().plus(configuration.getMaximumRate().getInterval());

Review Comment:
   Rather than using the watermark here, can we use state to store the next emitted timestamp? That has stronger synchronization guarantees in the case of splits (which may each track their own watermark) or other elements arriving with new restriction trackers. I'm wondering if this is where the issues you've seen around processcontinuation are coming from
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org