You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2024/02/27 18:36:59 UTC

(beam) branch master updated: Remove unused code (#30432)

This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af76b0de1a Remove unused code (#30432)
5af76b0de1a is described below

commit 5af76b0de1a2fb083e51d6f9be54f0989aabb0d0
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Tue Feb 27 10:36:53 2024 -0800

    Remove unused code (#30432)
---
 sdks/java/io/rrio/build.gradle                     |   2 -
 .../apache/beam/io/requestresponse/ApiIOError.java |  25 +-
 .../org/apache/beam/io/requestresponse/Call.java   |   4 +-
 .../org/apache/beam/io/requestresponse/Quota.java  |  69 ----
 .../beam/io/requestresponse/RequestResponseIO.java |  53 ---
 .../ThrottleWithExternalResource.java              | 418 ---------------------
 .../ThrottleWithoutExternalResource.java           |  57 ---
 .../ThrottleWithExternalResourceIT.java            | 186 ---------
 .../ThrottleWithExternalResourceTest.java          |  77 ----
 9 files changed, 11 insertions(+), 880 deletions(-)

diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index 4ecdf4e91df..9d51df4c1dc 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -34,8 +34,6 @@ dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     implementation library.java.joda_time
     implementation library.java.vendored_guava_32_1_2_jre
-    implementation library.java.jackson_core
-    implementation library.java.jackson_databind
     implementation "redis.clients:jedis:$jedisVersion"
 
     testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
index abb25bd33ba..5a3663fb103 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.io.requestresponse;
 
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.value.AutoValue;
 import java.util.Optional;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -36,19 +32,18 @@ import org.joda.time.Instant;
 @AutoValue
 public abstract class ApiIOError {
 
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
   /**
    * Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T}
-   * element is converted to a JSON string.
+   * element is converted to a string by calling {@link Object#toString()}.
    */
-  static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element)
-      throws JsonProcessingException {
-
-    String json = OBJECT_MAPPER.writeValueAsString(checkStateNotNull(element));
+  static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element) {
+    String request = "";
+    if (element != null) {
+      request = element.toString();
+    }
 
     return ApiIOError.builder()
-        .setRequestAsJsonString(json)
+        .setRequestAsString(request)
         .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
         .setObservedTimestamp(Instant.now())
         .setStackTrace(Throwables.getStackTraceAsString(e))
@@ -59,8 +54,8 @@ public abstract class ApiIOError {
     return new AutoValue_ApiIOError.Builder();
   }
 
-  /** The JSON string representation of the request associated with the error. */
-  public abstract String getRequestAsJsonString();
+  /** The string representation of the request associated with the error. */
+  public abstract String getRequestAsString();
 
   /** The observed timestamp of the error. */
   public abstract Instant getObservedTimestamp();
@@ -74,7 +69,7 @@ public abstract class ApiIOError {
   @AutoValue.Builder
   abstract static class Builder {
 
-    abstract Builder setRequestAsJsonString(String value);
+    abstract Builder setRequestAsString(String value);
 
     abstract Builder setObservedTimestamp(Instant value);
 
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index 65038a8ffa3..b6941f8fcbb 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -20,7 +20,6 @@ package org.apache.beam.io.requestresponse;
 import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
@@ -302,8 +301,7 @@ class Call<RequestT, ResponseT> extends PTransform<PCollection<RequestT>, Result
     }
 
     @ProcessElement
-    public void process(@Element RequestT request, MultiOutputReceiver receiver)
-        throws JsonProcessingException {
+    public void process(@Element RequestT request, MultiOutputReceiver receiver) {
 
       BackOff backOff = configuration.getBackOffSupplier().get();
       Sleeper sleeper = configuration.getSleeperSupplier().get();
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
deleted file mode 100644
index 1adc46b836a..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import java.io.Serializable;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-
-/**
- * A data class that expresses a quota. Web API providers typically define a quota as the number of
- * requests per time interval.
- */
-public class Quota implements Serializable {
-  private final long numRequests;
-  private final Duration interval;
-
-  public Quota(long numRequests, Duration interval) {
-    this.numRequests = numRequests;
-    this.interval = interval;
-  }
-
-  /** The number of allowed requests. */
-  public long getNumRequests() {
-    return numRequests;
-  }
-
-  /** The duration context within which to allow requests. */
-  public Duration getInterval() {
-    return interval;
-  }
-
-  @Override
-  public boolean equals(@Nullable Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    Quota quota = (Quota) o;
-    return numRequests == quota.numRequests && Objects.equal(interval, quota.interval);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(numRequests, interval);
-  }
-
-  @Override
-  public String toString() {
-    return "Quota{" + "numRequests=" + numRequests + ", interval=" + interval + '}';
-  }
-}
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
index b7338cb64ff..9c5c6128c29 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
@@ -107,8 +107,6 @@ public class RequestResponseIO<RequestT, ResponseT>
 
   private static final String CACHE_WRITE_NAME = "CacheWrite";
 
-  private static final String THROTTLE_NAME = "Throttle";
-
   private final TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
   private final TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
 
@@ -261,18 +259,6 @@ public class RequestResponseIO<RequestT, ResponseT>
         rrioConfiguration, callConfiguration.toBuilder().setMonitoringConfiguration(value).build());
   }
 
-  /**
-   * Configures {@link RequestResponseIO} with a {@link PTransform} that holds back {@link
-   * RequestT}s to prevent quota errors such as HTTP 429 or gRPC RESOURCE_EXHAUSTION errors.
-   */
-  // TODO(damondouglas): Until https://github.com/apache/beam/issues/28930 there is no provided
-  //  solution for this, however this method allows users to provide their own at this time.
-  public RequestResponseIO<RequestT, ResponseT> withPreventiveThrottle(
-      PTransform<PCollection<RequestT>, Result<RequestT>> throttle) {
-    return new RequestResponseIO<>(
-        rrioConfiguration.toBuilder().setThrottle(throttle).build(), callConfiguration);
-  }
-
   /** Exposes the transform's {@link Call.Configuration} for testing. */
   @VisibleForTesting
   Call.Configuration<RequestT, ResponseT> getCallConfiguration() {
@@ -308,9 +294,6 @@ public class RequestResponseIO<RequestT, ResponseT>
             PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>>
         getCacheWrite();
 
-    /** Throttles a {@link RequestT} {@link PCollection}. */
-    abstract @Nullable PTransform<PCollection<RequestT>, Result<RequestT>> getThrottle();
-
     abstract Builder<RequestT, ResponseT> toBuilder();
 
     @AutoValue.Builder
@@ -327,10 +310,6 @@ public class RequestResponseIO<RequestT, ResponseT>
       abstract Builder<RequestT, ResponseT> setCacheWrite(
           PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> value);
 
-      /** See {@link #getThrottle}. */
-      abstract Builder<RequestT, ResponseT> setThrottle(
-          PTransform<PCollection<RequestT>, Result<RequestT>> value);
-
       abstract Configuration<RequestT, ResponseT> build();
     }
   }
@@ -349,12 +328,6 @@ public class RequestResponseIO<RequestT, ResponseT>
     responseList = cacheRead.getMiddle();
     failureList = cacheRead.getRight();
 
-    // Throttle the RequestT input PCollection.
-    Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> throttle =
-        expandThrottle(input, failureList);
-    input = throttle.getLeft();
-    failureList = throttle.getRight();
-
     // Invoke the Caller for each RequestT input and write associated RequestT and ResponseT to
     // the cache, if available.
     Pair<PCollectionList<ResponseT>, PCollectionList<ApiIOError>> call =
@@ -423,32 +396,6 @@ public class RequestResponseIO<RequestT, ResponseT>
     return Triple.of(input, responseList, failureList);
   }
 
-  /**
-   * Expands with {@link Configuration#getThrottle}, if available. Otherwise, returns a {@link Pair}
-   * of original arguments.
-   *
-   * <pre>Algorithm is as follows:
-   * <ol>
-   *   <li>Applies throttle transform to {@link RequestT} {@link PCollection} input.</li>
-   *   <li>Returns throttled {@link PCollection} of {@link RequestT} elements.</li>
-   *   <li>Returns appended {@link PCollection} of {@link ApiIOError}s to the failureList.</li>
-   * </ol></pre>
-   */
-  // TODO(damondouglas): See https://github.com/apache/beam/issues/28930; currently there is no
-  //  provided solution for this, though users could provide their own via withThrottle.
-  Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> expandThrottle(
-      PCollection<RequestT> input, PCollectionList<ApiIOError> failureList) {
-
-    if (rrioConfiguration.getThrottle() == null) {
-      return Pair.of(input, failureList);
-    }
-
-    Result<RequestT> throttleResult =
-        input.apply(THROTTLE_NAME, checkStateNotNull(rrioConfiguration.getThrottle()));
-
-    return Pair.of(throttleResult.getResponses(), failureList.and(throttleResult.getFailures()));
-  }
-
   /**
    * Expands with a {@link Call} and {@link Configuration#getCacheWrite}, if available.
    *
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
deleted file mode 100644
index 808f0b5d963..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Throttles a {@link T} {@link PCollection} using an external resource.
- *
- * <p>{@link ThrottleWithExternalResource} makes use of {@link PeriodicImpulse} as it needs to
- * coordinate three {@link PTransform}s concurrently. Usage of {@link ThrottleWithExternalResource}
- * should consider the impact of {@link PeriodicImpulse} on the pipeline.
- *
- * <p>Usage of {@link ThrottleWithExternalResource} is completely optional and serves as one of many
- * methods by {@link RequestResponseIO} to protect against API overuse. Usage should not depend on
- * {@link ThrottleWithExternalResource} alone to achieve API overuse prevention for several reasons.
- * The underlying external resource may not scale at all or as fast as a Beam Runner. The external
- * resource itself may be an API with its own quota that {@link ThrottleWithExternalResource} does
- * not consider.
- *
- * <p>{@link ThrottleWithExternalResource} makes use of several {@link Caller}s that work together
- * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link RefresherT} is a
- * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link Quota}. An {@link
- * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues said element when the
- * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is >0. Finally, a {@link
- * DecrementerT} decrements from the shared {@link Quota} value, additionally reporting the value
- * after performing the action.
- *
- * <p>{@link ThrottleWithExternalResource} instantiates and applies two {@link Call} {@link
- * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT} and {@link EnqueuerT}.
- * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link DequeuerT}, {@link
- * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when the {@link ReporterT}
- * reports a value >0. As an additional safety check, the DoFn checks whether the {@link Quota}
- * value after {@link DecrementerT}'s action is <0, signaling that multiple workers are attempting
- * the same too fast and therefore exists the DoFn allowing for the next refresh.
- *
- * <p>{@link ThrottleWithExternalResource} flattens errors emitted from {@link EnqueuerT}, {@link
- * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} {@link PCollection} that
- * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Result}.
- */
-class ThrottleWithExternalResource<
-        T,
-        ReporterT extends Caller<String, Long> & SetupTeardown,
-        EnqueuerT extends Caller<T, Void> & SetupTeardown,
-        DequeuerT extends Caller<Instant, T> & SetupTeardown,
-        DecrementerT extends Caller<Instant, Long> & SetupTeardown,
-        RefresherT extends Caller<Instant, Void> & SetupTeardown>
-    extends PTransform<PCollection<T>, Result<T>> {
-
-  /**
-   * Instantiate a {@link ThrottleWithExternalResource} using a {@link RedisClient}.
-   *
-   * <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
-   * reading and writing to a shared instance. See <a
-   * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
-   * considerations when using Redis as {@link ThrottleWithExternalResource}'s external resource.
-   */
-  static <T>
-      ThrottleWithExternalResource<
-              T,
-              RedisReporter,
-              RedisEnqueuer<T>,
-              RedisDequeuer<T>,
-              RedisDecrementer,
-              RedisRefresher>
-          usingRedis(URI uri, String quotaIdentifier, String queueKey, Quota quota, Coder<T> coder)
-              throws Coder.NonDeterministicException {
-    return new ThrottleWithExternalResource<
-        T, RedisReporter, RedisEnqueuer<T>, RedisDequeuer<T>, RedisDecrementer, RedisRefresher>(
-        quota,
-        quotaIdentifier,
-        coder,
-        new RedisReporter(uri),
-        new RedisEnqueuer<>(uri, queueKey, coder),
-        new RedisDequeuer<>(uri, coder, queueKey),
-        new RedisDecrementer(uri, queueKey),
-        new RedisRefresher(uri, quota, quotaIdentifier));
-  }
-
-  private static final Duration THROTTLE_INTERVAL = Duration.standardSeconds(1L);
-
-  private final Quota quota;
-  private final String quotaIdentifier;
-  private final Coder<T> coder;
-  private final ReporterT reporterT;
-  private final EnqueuerT enqueuerT;
-  private final DequeuerT dequeuerT;
-  private final DecrementerT decrementerT;
-  private final RefresherT refresherT;
-
-  ThrottleWithExternalResource(
-      Quota quota,
-      String quotaIdentifier,
-      Coder<T> coder,
-      ReporterT reporterT,
-      EnqueuerT enqueuerT,
-      DequeuerT dequeuerT,
-      DecrementerT decrementerT,
-      RefresherT refresherT)
-      throws Coder.NonDeterministicException {
-    this.quotaIdentifier = quotaIdentifier;
-    this.reporterT = reporterT;
-    coder.verifyDeterministic();
-    checkArgument(!quotaIdentifier.isEmpty());
-    this.quota = quota;
-    this.coder = coder;
-    this.enqueuerT = enqueuerT;
-    this.dequeuerT = dequeuerT;
-    this.decrementerT = decrementerT;
-    this.refresherT = refresherT;
-  }
-
-  @Override
-  public Result<T> expand(PCollection<T> input) {
-    Pipeline pipeline = input.getPipeline();
-
-    // Refresh known quota to control the throttle rate.
-    Result<Void> refreshResult =
-        pipeline
-            .apply("quota impulse", PeriodicImpulse.create().withInterval(quota.getInterval()))
-            .apply("quota refresh", getRefresher());
-
-    // Enqueue T elements.
-    Result<Void> enqueuResult = input.apply("enqueue", getEnqueuer());
-
-    TupleTag<T> outputTag = new TupleTag<T>() {};
-    TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
-
-    // Perform Throttle.
-    PCollectionTuple pct =
-        pipeline
-            .apply("throttle impulse", PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL))
-            .apply(
-                "throttle fn",
-                ParDo.of(
-                        new ThrottleFn(
-                            quotaIdentifier,
-                            dequeuerT,
-                            decrementerT,
-                            reporterT,
-                            outputTag,
-                            failureTag))
-                    .withOutputTags(outputTag, TupleTagList.of(failureTag)));
-
-    PCollection<ApiIOError> errors =
-        PCollectionList.of(refreshResult.getFailures())
-            .and(enqueuResult.getFailures())
-            .and(pct.get(failureTag))
-            .apply("errors flatten", Flatten.pCollections());
-
-    TupleTag<T> resultOutputTag = new TupleTag<T>() {};
-    TupleTag<ApiIOError> resultFailureTag = new TupleTag<ApiIOError>() {};
-
-    return Result.<T>of(
-        coder,
-        resultOutputTag,
-        resultFailureTag,
-        PCollectionTuple.of(resultOutputTag, pct.get(outputTag)).and(resultFailureTag, errors));
-  }
-
-  private Call<Instant, Void> getRefresher() {
-    return Call.ofCallerAndSetupTeardown(refresherT, VoidCoder.of());
-  }
-
-  private Call<T, Void> getEnqueuer() {
-    return Call.ofCallerAndSetupTeardown(enqueuerT, VoidCoder.of());
-  }
-
-  private class ThrottleFn extends DoFn<Instant, T> {
-    private final String quotaIdentifier;
-    private final DequeuerT dequeuerT;
-    private final DecrementerT decrementerT;
-    private final ReporterT reporterT;
-    private final TupleTag<T> outputTag;
-    private final TupleTag<ApiIOError> failureTag;
-
-    private ThrottleFn(
-        String quotaIdentifier,
-        DequeuerT dequeuerT,
-        DecrementerT decrementerT,
-        ReporterT reporterT,
-        TupleTag<T> outputTag,
-        TupleTag<ApiIOError> failureTag) {
-      this.quotaIdentifier = quotaIdentifier;
-      this.dequeuerT = dequeuerT;
-      this.decrementerT = decrementerT;
-      this.reporterT = reporterT;
-      this.outputTag = outputTag;
-      this.failureTag = failureTag;
-    }
-
-    @ProcessElement
-    public void process(@Element Instant instant, MultiOutputReceiver receiver) {
-      // Check for available quota.
-      try {
-        if (reporterT.call(quotaIdentifier) <= 0L) {
-          return;
-        }
-
-        // Decrement the quota.
-        Long quotaAfterDecrement = decrementerT.call(instant);
-
-        // As an additional protection we check what the quota is after decrementing. A value
-        // < 0 signals that multiple simultaneous workers have attempted to decrement too quickly.
-        // We don't bother adding the quota back to prevent additional workers from doing the same
-        // and just wait for the next refresh, exiting the DoFn.
-        if (quotaAfterDecrement < 0) {
-          return;
-        }
-
-        // Dequeue an element if quota available. An error here would not result in loss of data
-        // as no element would successfully dequeue from the external resource but instead throw.
-        T element = dequeuerT.call(instant);
-
-        // Finally, emit the element.
-        receiver.get(outputTag).output(element);
-
-      } catch (UserCodeExecutionException e) {
-        receiver
-            .get(failureTag)
-            .output(
-                ApiIOError.builder()
-                    // no request to emit as part of the error.
-                    .setRequestAsJsonString("")
-                    .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
-                    .setObservedTimestamp(Instant.now())
-                    .setStackTrace(Throwables.getStackTraceAsString(e))
-                    .build());
-      }
-    }
-
-    @Setup
-    public void setup() throws UserCodeExecutionException {
-      enqueuerT.setup();
-      dequeuerT.setup();
-      decrementerT.setup();
-      reporterT.setup();
-    }
-
-    @Teardown
-    public void teardown() throws UserCodeExecutionException {
-      List<String> messages = new ArrayList<>();
-      String format = "%s encountered error during teardown: %s";
-      try {
-        enqueuerT.teardown();
-      } catch (UserCodeExecutionException e) {
-        messages.add(String.format(format, "enqueuerT", e));
-      }
-      try {
-        dequeuerT.teardown();
-      } catch (UserCodeExecutionException e) {
-        messages.add(String.format(format, "dequeuerT", e));
-      }
-      try {
-        decrementerT.teardown();
-      } catch (UserCodeExecutionException e) {
-        messages.add(String.format(format, "decrementerT", e));
-      }
-      try {
-        reporterT.teardown();
-      } catch (UserCodeExecutionException e) {
-        messages.add(String.format(format, "reporterT", e));
-      }
-
-      if (!messages.isEmpty()) {
-        throw new UserCodeExecutionException(String.join("; ", messages));
-      }
-    }
-  }
-
-  private static class RedisReporter extends RedisSetupTeardown implements Caller<String, Long> {
-    private RedisReporter(URI uri) {
-      super(new RedisClient(uri));
-    }
-
-    @Override
-    public Long call(String request) throws UserCodeExecutionException {
-      return client.getLong(request);
-    }
-  }
-
-  private static class RedisEnqueuer<T> extends RedisSetupTeardown implements Caller<T, Void> {
-    private final String key;
-    private final Coder<T> coder;
-
-    private RedisEnqueuer(URI uri, String key, Coder<T> coder) {
-      super(new RedisClient(uri));
-      this.key = key;
-      this.coder = coder;
-    }
-
-    @Override
-    public Void call(T request) throws UserCodeExecutionException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      try {
-        coder.encode(request, baos);
-      } catch (IOException e) {
-        throw new UserCodeExecutionException(e);
-      }
-      client.rpush(key, baos.toByteArray());
-      return null;
-    }
-  }
-
-  private static class RedisDequeuer<T> extends RedisSetupTeardown implements Caller<Instant, T> {
-
-    private final Coder<T> coder;
-    private final String key;
-
-    private RedisDequeuer(URI uri, Coder<T> coder, String key) {
-      super(new RedisClient(uri));
-      this.coder = coder;
-      this.key = key;
-    }
-
-    @Override
-    public T call(Instant request) throws UserCodeExecutionException {
-      byte[] bytes = client.lpop(key);
-      try {
-        return checkStateNotNull(coder.decode(ByteSource.wrap(bytes).openStream()));
-
-      } catch (IOException e) {
-        throw new UserCodeExecutionException(e);
-      }
-    }
-  }
-
-  private static class RedisDecrementer extends RedisSetupTeardown
-      implements Caller<Instant, Long> {
-
-    private final String key;
-
-    private RedisDecrementer(URI uri, String key) {
-      super(new RedisClient(uri));
-      this.key = key;
-    }
-
-    @Override
-    public Long call(Instant request) throws UserCodeExecutionException {
-      return client.decr(key);
-    }
-  }
-
-  private static class RedisRefresher extends RedisSetupTeardown implements Caller<Instant, Void> {
-    private final Quota quota;
-    private final String key;
-
-    private RedisRefresher(URI uri, Quota quota, String key) {
-      super(new RedisClient(uri));
-      this.quota = quota;
-      this.key = key;
-    }
-
-    @Override
-    public Void call(Instant request) throws UserCodeExecutionException {
-      client.setex(key, quota.getNumRequests(), quota.getInterval());
-      return null;
-    }
-  }
-
-  private abstract static class RedisSetupTeardown implements SetupTeardown {
-    protected final RedisClient client;
-
-    private RedisSetupTeardown(RedisClient client) {
-      this.client = client;
-    }
-
-    @Override
-    public void setup() throws UserCodeExecutionException {
-      client.setup();
-    }
-
-    @Override
-    public void teardown() throws UserCodeExecutionException {
-      client.teardown();
-    }
-  }
-}
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
deleted file mode 100644
index 0648a86f28e..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link PCollection} emitting
- * a {@link RequestT} {@link PCollection} at a maximally configured rate, without using an external
- * resource.
- */
-// TODO(damondouglas): expand what "without external resource" means with respect to "with external
-//   resource" when the other throttle transforms implemented.
-//   See: https://github.com/apache/beam/issues/28932
-class ThrottleWithoutExternalResource<RequestT>
-    extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
-
-  // TODO(damondouglas): remove suppress warnings when finally utilized in a future PR.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
-
-  private ThrottleWithoutExternalResource(Configuration<RequestT> configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public PCollection<RequestT> expand(PCollection<RequestT> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return input;
-  }
-
-  @AutoValue
-  abstract static class Configuration<RequestT> {
-
-    @AutoValue.Builder
-    abstract static class Builder<RequestT> {
-      abstract Configuration<RequestT> build();
-    }
-  }
-}
diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
deleted file mode 100644
index 00cee0a4935..00000000000
--- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
-import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-import static org.apache.beam.sdk.values.TypeDescriptors.strings;
-
-import com.google.protobuf.ByteString;
-import java.net.URI;
-import java.util.UUID;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
-import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
-import org.joda.time.Duration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-
-/**
- * Integration tests for {@link ThrottleWithExternalResource}. See {@link EchoITOptions} for details
- * on the required parameters and how to provide these for running integration tests.
- */
-public class ThrottleWithExternalResourceIT {
-
-  @Rule public TestPipeline pipeline = TestPipeline.create();
-
-  private static final String QUOTA_ID = "echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota";
-  private static final Quota QUOTA = new Quota(1L, Duration.standardSeconds(1L));
-  private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");
-  private static @MonotonicNonNull EchoITOptions options;
-  private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
-  private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
-  private static final Integer PORT = 6379;
-  private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder();
-  private static final Coder<EchoResponse> RESPONSE_CODER =
-      SerializableCoder.of(TypeDescriptor.of(EchoResponse.class));
-
-  @Rule
-  public GenericContainer<?> redis =
-      new GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
-
-  @BeforeClass
-  public static void setUp() throws UserCodeExecutionException {
-    options = readIOTestPipelineOptions(EchoITOptions.class);
-    if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
-      throw new RuntimeException(
-          "--"
-              + GRPC_ENDPOINT_ADDRESS_NAME
-              + " is missing. See "
-              + EchoITOptions.class
-              + " for details.");
-    }
-    client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
-    checkStateNotNull(client).setup();
-
-    try {
-      client.call(createRequest());
-    } catch (UserCodeExecutionException e) {
-      if (e instanceof UserCodeQuotaException) {
-        throw new RuntimeException(
-            String.format(
-                "The quota: %s is set to refresh on an interval. Unless there are failures in this test, wait for a few seconds before running the test again.",
-                QUOTA_ID),
-            e);
-      }
-      throw e;
-    }
-  }
-
-  @AfterClass
-  public static void tearDown() throws UserCodeExecutionException {
-    checkStateNotNull(client).teardown();
-  }
-
-  @Test
-  public void givenThrottleUsingRedis_preventsQuotaErrors() throws NonDeterministicException {
-    URI uri =
-        URI.create(String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort()));
-    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
-    Result<EchoRequest> throttleResult =
-        createRequestStream()
-            .apply(
-                "throttle",
-                ThrottleWithExternalResource.usingRedis(
-                    uri, QUOTA_ID, UUID.randomUUID().toString(), QUOTA, REQUEST_CODER));
-
-    // Assert that we are not getting any errors and successfully emitting throttled elements.
-    PAssert.that(throttleResult.getFailures()).empty();
-    PAssert.thatSingleton(
-            throttleResult
-                .getResponses()
-                .apply(
-                    "window throttled", Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
-                .apply(
-                    "count throttled",
-                    Combine.globally(Count.<EchoRequest>combineFn()).withoutDefaults()))
-        .notEqualTo(0L);
-
-    // Assert that all the throttled data is not corrupted.
-    PAssert.that(
-            throttleResult
-                .getResponses()
-                .apply(
-                    "window throttled before extraction",
-                    Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
-                .apply(
-                    "extract request id",
-                    MapElements.into(strings()).via(input -> checkStateNotNull(input).getId()))
-                .apply("distinct", Distinct.create()))
-        .containsInAnyOrder(QUOTA_ID);
-
-    // Call the Echo service with throttled requests.
-    Result<EchoResponse> echoResult =
-        throttleResult
-            .getResponses()
-            .apply("call", Call.ofCallerAndSetupTeardown(client, RESPONSE_CODER));
-
-    // Assert that there were no errors.
-    PAssert.that(echoResult.getFailures()).empty();
-
-    // Assert that the responses match the requests.
-    PAssert.that(
-            echoResult
-                .getResponses()
-                .apply(
-                    "window responses before extraction",
-                    Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
-                .apply(
-                    "extract response id",
-                    MapElements.into(strings()).via(input -> checkStateNotNull(input).getId())))
-        .containsInAnyOrder(QUOTA_ID);
-
-    PipelineResult job = pipeline.run();
-    job.waitUntilFinish(Duration.standardSeconds(3L));
-  }
-
-  private static EchoRequest createRequest() {
-    return EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build();
-  }
-
-  private PCollection<EchoRequest> createRequestStream() {
-    return pipeline
-        .apply("impulse", PeriodicImpulse.create().withInterval(Duration.millis(10L)))
-        .apply(
-            "requests",
-            MapElements.into(TypeDescriptor.of(EchoRequest.class)).via(ignored -> createRequest()));
-  }
-}
diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
deleted file mode 100644
index 591ba923201..00000000000
--- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertThrows;
-
-import java.net.URI;
-import org.apache.beam.io.requestresponse.CallTest.Request;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ThrottleWithExternalResource}. */
-@RunWith(JUnit4.class)
-public class ThrottleWithExternalResourceTest {
-  @Rule public TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  public void givenNonDeterministicCoder_usingRedis_throwsError() throws NonDeterministicException {
-    URI uri = URI.create("redis://localhost:6379");
-    String quotaIdentifier = "quota";
-    String queueKey = "queue";
-    Quota quota = new Quota(10L, Duration.standardSeconds(1L));
-
-    assertThrows(
-        NonDeterministicException.class,
-        () ->
-            ThrottleWithExternalResource.usingRedis(
-                uri, quotaIdentifier, queueKey, quota, CallTest.NON_DETERMINISTIC_REQUEST_CODER));
-
-    ThrottleWithExternalResource.usingRedis(
-        uri, quotaIdentifier, queueKey, quota, CallTest.DETERMINISTIC_REQUEST_CODER);
-  }
-
-  @Test
-  public void givenWrongRedisURI_throwsError() throws NonDeterministicException {
-    URI uri = URI.create("redis://1.2.3.4:6379");
-    String quotaIdentifier = "quota";
-    String queueKey = "queue";
-    Quota quota = new Quota(10L, Duration.standardSeconds(1L));
-    PCollection<Request> requests =
-        pipeline.apply(Create.of(new Request(""))).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
-    requests.apply(
-        ThrottleWithExternalResource.usingRedis(
-            uri, quotaIdentifier, queueKey, quota, requests.getCoder()));
-
-    UncheckedExecutionException error =
-        assertThrows(UncheckedExecutionException.class, pipeline::run);
-    assertThat(
-        error.getCause().getMessage(),
-        containsString("Failed to connect to host: redis://1.2.3.4:6379"));
-  }
-}