You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2024/02/27 18:36:59 UTC
(beam) branch master updated: Remove unused code (#30432)
This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5af76b0de1a Remove unused code (#30432)
5af76b0de1a is described below
commit 5af76b0de1a2fb083e51d6f9be54f0989aabb0d0
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Tue Feb 27 10:36:53 2024 -0800
Remove unused code (#30432)
---
sdks/java/io/rrio/build.gradle | 2 -
.../apache/beam/io/requestresponse/ApiIOError.java | 25 +-
.../org/apache/beam/io/requestresponse/Call.java | 4 +-
.../org/apache/beam/io/requestresponse/Quota.java | 69 ----
.../beam/io/requestresponse/RequestResponseIO.java | 53 ---
.../ThrottleWithExternalResource.java | 418 ---------------------
.../ThrottleWithoutExternalResource.java | 57 ---
.../ThrottleWithExternalResourceIT.java | 186 ---------
.../ThrottleWithExternalResourceTest.java | 77 ----
9 files changed, 11 insertions(+), 880 deletions(-)
diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index 4ecdf4e91df..9d51df4c1dc 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -34,8 +34,6 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre
- implementation library.java.jackson_core
- implementation library.java.jackson_databind
implementation "redis.clients:jedis:$jedisVersion"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
index abb25bd33ba..5a3663fb103 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java
@@ -17,10 +17,6 @@
*/
package org.apache.beam.io.requestresponse;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.util.Optional;
import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -36,19 +32,18 @@ import org.joda.time.Instant;
@AutoValue
public abstract class ApiIOError {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
/**
* Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T}
- * element is converted to a JSON string.
+ * element is converted to a string by calling {@link Object#toString()}.
*/
- static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element)
- throws JsonProcessingException {
-
- String json = OBJECT_MAPPER.writeValueAsString(checkStateNotNull(element));
+ static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element) {
+ String request = "";
+ if (element != null) {
+ request = element.toString();
+ }
return ApiIOError.builder()
- .setRequestAsJsonString(json)
+ .setRequestAsString(request)
.setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
.setObservedTimestamp(Instant.now())
.setStackTrace(Throwables.getStackTraceAsString(e))
@@ -59,8 +54,8 @@ public abstract class ApiIOError {
return new AutoValue_ApiIOError.Builder();
}
- /** The JSON string representation of the request associated with the error. */
- public abstract String getRequestAsJsonString();
+ /** The string representation of the request associated with the error. */
+ public abstract String getRequestAsString();
/** The observed timestamp of the error. */
public abstract Instant getObservedTimestamp();
@@ -74,7 +69,7 @@ public abstract class ApiIOError {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setRequestAsJsonString(String value);
+ abstract Builder setRequestAsString(String value);
abstract Builder setObservedTimestamp(Instant value);
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index 65038a8ffa3..b6941f8fcbb 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -20,7 +20,6 @@ package org.apache.beam.io.requestresponse;
import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
@@ -302,8 +301,7 @@ class Call<RequestT, ResponseT> extends PTransform<PCollection<RequestT>, Result
}
@ProcessElement
- public void process(@Element RequestT request, MultiOutputReceiver receiver)
- throws JsonProcessingException {
+ public void process(@Element RequestT request, MultiOutputReceiver receiver) {
BackOff backOff = configuration.getBackOffSupplier().get();
Sleeper sleeper = configuration.getSleeperSupplier().get();
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
deleted file mode 100644
index 1adc46b836a..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import java.io.Serializable;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-
-/**
- * A data class that expresses a quota. Web API providers typically define a quota as the number of
- * requests per time interval.
- */
-public class Quota implements Serializable {
- private final long numRequests;
- private final Duration interval;
-
- public Quota(long numRequests, Duration interval) {
- this.numRequests = numRequests;
- this.interval = interval;
- }
-
- /** The number of allowed requests. */
- public long getNumRequests() {
- return numRequests;
- }
-
- /** The duration context within which to allow requests. */
- public Duration getInterval() {
- return interval;
- }
-
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Quota quota = (Quota) o;
- return numRequests == quota.numRequests && Objects.equal(interval, quota.interval);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(numRequests, interval);
- }
-
- @Override
- public String toString() {
- return "Quota{" + "numRequests=" + numRequests + ", interval=" + interval + '}';
- }
-}
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
index b7338cb64ff..9c5c6128c29 100644
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
+++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
@@ -107,8 +107,6 @@ public class RequestResponseIO<RequestT, ResponseT>
private static final String CACHE_WRITE_NAME = "CacheWrite";
- private static final String THROTTLE_NAME = "Throttle";
-
private final TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
private final TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
@@ -261,18 +259,6 @@ public class RequestResponseIO<RequestT, ResponseT>
rrioConfiguration, callConfiguration.toBuilder().setMonitoringConfiguration(value).build());
}
- /**
- * Configures {@link RequestResponseIO} with a {@link PTransform} that holds back {@link
- * RequestT}s to prevent quota errors such as HTTP 429 or gRPC RESOURCE_EXHAUSTION errors.
- */
- // TODO(damondouglas): Until https://github.com/apache/beam/issues/28930 there is no provided
- // solution for this, however this method allows users to provide their own at this time.
- public RequestResponseIO<RequestT, ResponseT> withPreventiveThrottle(
- PTransform<PCollection<RequestT>, Result<RequestT>> throttle) {
- return new RequestResponseIO<>(
- rrioConfiguration.toBuilder().setThrottle(throttle).build(), callConfiguration);
- }
-
/** Exposes the transform's {@link Call.Configuration} for testing. */
@VisibleForTesting
Call.Configuration<RequestT, ResponseT> getCallConfiguration() {
@@ -308,9 +294,6 @@ public class RequestResponseIO<RequestT, ResponseT>
PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>>
getCacheWrite();
- /** Throttles a {@link RequestT} {@link PCollection}. */
- abstract @Nullable PTransform<PCollection<RequestT>, Result<RequestT>> getThrottle();
-
abstract Builder<RequestT, ResponseT> toBuilder();
@AutoValue.Builder
@@ -327,10 +310,6 @@ public class RequestResponseIO<RequestT, ResponseT>
abstract Builder<RequestT, ResponseT> setCacheWrite(
PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> value);
- /** See {@link #getThrottle}. */
- abstract Builder<RequestT, ResponseT> setThrottle(
- PTransform<PCollection<RequestT>, Result<RequestT>> value);
-
abstract Configuration<RequestT, ResponseT> build();
}
}
@@ -349,12 +328,6 @@ public class RequestResponseIO<RequestT, ResponseT>
responseList = cacheRead.getMiddle();
failureList = cacheRead.getRight();
- // Throttle the RequestT input PCollection.
- Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> throttle =
- expandThrottle(input, failureList);
- input = throttle.getLeft();
- failureList = throttle.getRight();
-
// Invoke the Caller for each RequestT input and write associated RequestT and ResponseT to
// the cache, if available.
Pair<PCollectionList<ResponseT>, PCollectionList<ApiIOError>> call =
@@ -423,32 +396,6 @@ public class RequestResponseIO<RequestT, ResponseT>
return Triple.of(input, responseList, failureList);
}
- /**
- * Expands with {@link Configuration#getThrottle}, if available. Otherwise, returns a {@link Pair}
- * of original arguments.
- *
- * <pre>Algorithm is as follows:
- * <ol>
- * <li>Applies throttle transform to {@link RequestT} {@link PCollection} input.</li>
- * <li>Returns throttled {@link PCollection} of {@link RequestT} elements.</li>
- * <li>Returns appended {@link PCollection} of {@link ApiIOError}s to the failureList.</li>
- * </ol></pre>
- */
- // TODO(damondouglas): See https://github.com/apache/beam/issues/28930; currently there is no
- // provided solution for this, though users could provide their own via withThrottle.
- Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> expandThrottle(
- PCollection<RequestT> input, PCollectionList<ApiIOError> failureList) {
-
- if (rrioConfiguration.getThrottle() == null) {
- return Pair.of(input, failureList);
- }
-
- Result<RequestT> throttleResult =
- input.apply(THROTTLE_NAME, checkStateNotNull(rrioConfiguration.getThrottle()));
-
- return Pair.of(throttleResult.getResponses(), failureList.and(throttleResult.getFailures()));
- }
-
/**
* Expands with a {@link Call} and {@link Configuration#getCacheWrite}, if available.
*
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
deleted file mode 100644
index 808f0b5d963..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Throttles a {@link T} {@link PCollection} using an external resource.
- *
- * <p>{@link ThrottleWithExternalResource} makes use of {@link PeriodicImpulse} as it needs to
- * coordinate three {@link PTransform}s concurrently. Usage of {@link ThrottleWithExternalResource}
- * should consider the impact of {@link PeriodicImpulse} on the pipeline.
- *
- * <p>Usage of {@link ThrottleWithExternalResource} is completely optional and serves as one of many
- * methods by {@link RequestResponseIO} to protect against API overuse. Usage should not depend on
- * {@link ThrottleWithExternalResource} alone to achieve API overuse prevention for several reasons.
- * The underlying external resource may not scale at all or as fast as a Beam Runner. The external
- * resource itself may be an API with its own quota that {@link ThrottleWithExternalResource} does
- * not consider.
- *
- * <p>{@link ThrottleWithExternalResource} makes use of several {@link Caller}s that work together
- * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link RefresherT} is a
- * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link Quota}. An {@link
- * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues said element when the
- * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is >0. Finally, a {@link
- * DecrementerT} decrements from the shared {@link Quota} value, additionally reporting the value
- * after performing the action.
- *
- * <p>{@link ThrottleWithExternalResource} instantiates and applies two {@link Call} {@link
- * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT} and {@link EnqueuerT}.
- * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link DequeuerT}, {@link
- * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when the {@link ReporterT}
- * reports a value >0. As an additional safety check, the DoFn checks whether the {@link Quota}
- * value after {@link DecrementerT}'s action is <0, signaling that multiple workers are attempting
- * the same too fast and therefore exists the DoFn allowing for the next refresh.
- *
- * <p>{@link ThrottleWithExternalResource} flattens errors emitted from {@link EnqueuerT}, {@link
- * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} {@link PCollection} that
- * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Result}.
- */
-class ThrottleWithExternalResource<
- T,
- ReporterT extends Caller<String, Long> & SetupTeardown,
- EnqueuerT extends Caller<T, Void> & SetupTeardown,
- DequeuerT extends Caller<Instant, T> & SetupTeardown,
- DecrementerT extends Caller<Instant, Long> & SetupTeardown,
- RefresherT extends Caller<Instant, Void> & SetupTeardown>
- extends PTransform<PCollection<T>, Result<T>> {
-
- /**
- * Instantiate a {@link ThrottleWithExternalResource} using a {@link RedisClient}.
- *
- * <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
- * reading and writing to a shared instance. See <a
- * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
- * considerations when using Redis as {@link ThrottleWithExternalResource}'s external resource.
- */
- static <T>
- ThrottleWithExternalResource<
- T,
- RedisReporter,
- RedisEnqueuer<T>,
- RedisDequeuer<T>,
- RedisDecrementer,
- RedisRefresher>
- usingRedis(URI uri, String quotaIdentifier, String queueKey, Quota quota, Coder<T> coder)
- throws Coder.NonDeterministicException {
- return new ThrottleWithExternalResource<
- T, RedisReporter, RedisEnqueuer<T>, RedisDequeuer<T>, RedisDecrementer, RedisRefresher>(
- quota,
- quotaIdentifier,
- coder,
- new RedisReporter(uri),
- new RedisEnqueuer<>(uri, queueKey, coder),
- new RedisDequeuer<>(uri, coder, queueKey),
- new RedisDecrementer(uri, queueKey),
- new RedisRefresher(uri, quota, quotaIdentifier));
- }
-
- private static final Duration THROTTLE_INTERVAL = Duration.standardSeconds(1L);
-
- private final Quota quota;
- private final String quotaIdentifier;
- private final Coder<T> coder;
- private final ReporterT reporterT;
- private final EnqueuerT enqueuerT;
- private final DequeuerT dequeuerT;
- private final DecrementerT decrementerT;
- private final RefresherT refresherT;
-
- ThrottleWithExternalResource(
- Quota quota,
- String quotaIdentifier,
- Coder<T> coder,
- ReporterT reporterT,
- EnqueuerT enqueuerT,
- DequeuerT dequeuerT,
- DecrementerT decrementerT,
- RefresherT refresherT)
- throws Coder.NonDeterministicException {
- this.quotaIdentifier = quotaIdentifier;
- this.reporterT = reporterT;
- coder.verifyDeterministic();
- checkArgument(!quotaIdentifier.isEmpty());
- this.quota = quota;
- this.coder = coder;
- this.enqueuerT = enqueuerT;
- this.dequeuerT = dequeuerT;
- this.decrementerT = decrementerT;
- this.refresherT = refresherT;
- }
-
- @Override
- public Result<T> expand(PCollection<T> input) {
- Pipeline pipeline = input.getPipeline();
-
- // Refresh known quota to control the throttle rate.
- Result<Void> refreshResult =
- pipeline
- .apply("quota impulse", PeriodicImpulse.create().withInterval(quota.getInterval()))
- .apply("quota refresh", getRefresher());
-
- // Enqueue T elements.
- Result<Void> enqueuResult = input.apply("enqueue", getEnqueuer());
-
- TupleTag<T> outputTag = new TupleTag<T>() {};
- TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
-
- // Perform Throttle.
- PCollectionTuple pct =
- pipeline
- .apply("throttle impulse", PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL))
- .apply(
- "throttle fn",
- ParDo.of(
- new ThrottleFn(
- quotaIdentifier,
- dequeuerT,
- decrementerT,
- reporterT,
- outputTag,
- failureTag))
- .withOutputTags(outputTag, TupleTagList.of(failureTag)));
-
- PCollection<ApiIOError> errors =
- PCollectionList.of(refreshResult.getFailures())
- .and(enqueuResult.getFailures())
- .and(pct.get(failureTag))
- .apply("errors flatten", Flatten.pCollections());
-
- TupleTag<T> resultOutputTag = new TupleTag<T>() {};
- TupleTag<ApiIOError> resultFailureTag = new TupleTag<ApiIOError>() {};
-
- return Result.<T>of(
- coder,
- resultOutputTag,
- resultFailureTag,
- PCollectionTuple.of(resultOutputTag, pct.get(outputTag)).and(resultFailureTag, errors));
- }
-
- private Call<Instant, Void> getRefresher() {
- return Call.ofCallerAndSetupTeardown(refresherT, VoidCoder.of());
- }
-
- private Call<T, Void> getEnqueuer() {
- return Call.ofCallerAndSetupTeardown(enqueuerT, VoidCoder.of());
- }
-
- private class ThrottleFn extends DoFn<Instant, T> {
- private final String quotaIdentifier;
- private final DequeuerT dequeuerT;
- private final DecrementerT decrementerT;
- private final ReporterT reporterT;
- private final TupleTag<T> outputTag;
- private final TupleTag<ApiIOError> failureTag;
-
- private ThrottleFn(
- String quotaIdentifier,
- DequeuerT dequeuerT,
- DecrementerT decrementerT,
- ReporterT reporterT,
- TupleTag<T> outputTag,
- TupleTag<ApiIOError> failureTag) {
- this.quotaIdentifier = quotaIdentifier;
- this.dequeuerT = dequeuerT;
- this.decrementerT = decrementerT;
- this.reporterT = reporterT;
- this.outputTag = outputTag;
- this.failureTag = failureTag;
- }
-
- @ProcessElement
- public void process(@Element Instant instant, MultiOutputReceiver receiver) {
- // Check for available quota.
- try {
- if (reporterT.call(quotaIdentifier) <= 0L) {
- return;
- }
-
- // Decrement the quota.
- Long quotaAfterDecrement = decrementerT.call(instant);
-
- // As an additional protection we check what the quota is after decrementing. A value
- // < 0 signals that multiple simultaneous workers have attempted to decrement too quickly.
- // We don't bother adding the quota back to prevent additional workers from doing the same
- // and just wait for the next refresh, exiting the DoFn.
- if (quotaAfterDecrement < 0) {
- return;
- }
-
- // Dequeue an element if quota available. An error here would not result in loss of data
- // as no element would successfully dequeue from the external resource but instead throw.
- T element = dequeuerT.call(instant);
-
- // Finally, emit the element.
- receiver.get(outputTag).output(element);
-
- } catch (UserCodeExecutionException e) {
- receiver
- .get(failureTag)
- .output(
- ApiIOError.builder()
- // no request to emit as part of the error.
- .setRequestAsJsonString("")
- .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
- .setObservedTimestamp(Instant.now())
- .setStackTrace(Throwables.getStackTraceAsString(e))
- .build());
- }
- }
-
- @Setup
- public void setup() throws UserCodeExecutionException {
- enqueuerT.setup();
- dequeuerT.setup();
- decrementerT.setup();
- reporterT.setup();
- }
-
- @Teardown
- public void teardown() throws UserCodeExecutionException {
- List<String> messages = new ArrayList<>();
- String format = "%s encountered error during teardown: %s";
- try {
- enqueuerT.teardown();
- } catch (UserCodeExecutionException e) {
- messages.add(String.format(format, "enqueuerT", e));
- }
- try {
- dequeuerT.teardown();
- } catch (UserCodeExecutionException e) {
- messages.add(String.format(format, "dequeuerT", e));
- }
- try {
- decrementerT.teardown();
- } catch (UserCodeExecutionException e) {
- messages.add(String.format(format, "decrementerT", e));
- }
- try {
- reporterT.teardown();
- } catch (UserCodeExecutionException e) {
- messages.add(String.format(format, "reporterT", e));
- }
-
- if (!messages.isEmpty()) {
- throw new UserCodeExecutionException(String.join("; ", messages));
- }
- }
- }
-
- private static class RedisReporter extends RedisSetupTeardown implements Caller<String, Long> {
- private RedisReporter(URI uri) {
- super(new RedisClient(uri));
- }
-
- @Override
- public Long call(String request) throws UserCodeExecutionException {
- return client.getLong(request);
- }
- }
-
- private static class RedisEnqueuer<T> extends RedisSetupTeardown implements Caller<T, Void> {
- private final String key;
- private final Coder<T> coder;
-
- private RedisEnqueuer(URI uri, String key, Coder<T> coder) {
- super(new RedisClient(uri));
- this.key = key;
- this.coder = coder;
- }
-
- @Override
- public Void call(T request) throws UserCodeExecutionException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- coder.encode(request, baos);
- } catch (IOException e) {
- throw new UserCodeExecutionException(e);
- }
- client.rpush(key, baos.toByteArray());
- return null;
- }
- }
-
- private static class RedisDequeuer<T> extends RedisSetupTeardown implements Caller<Instant, T> {
-
- private final Coder<T> coder;
- private final String key;
-
- private RedisDequeuer(URI uri, Coder<T> coder, String key) {
- super(new RedisClient(uri));
- this.coder = coder;
- this.key = key;
- }
-
- @Override
- public T call(Instant request) throws UserCodeExecutionException {
- byte[] bytes = client.lpop(key);
- try {
- return checkStateNotNull(coder.decode(ByteSource.wrap(bytes).openStream()));
-
- } catch (IOException e) {
- throw new UserCodeExecutionException(e);
- }
- }
- }
-
- private static class RedisDecrementer extends RedisSetupTeardown
- implements Caller<Instant, Long> {
-
- private final String key;
-
- private RedisDecrementer(URI uri, String key) {
- super(new RedisClient(uri));
- this.key = key;
- }
-
- @Override
- public Long call(Instant request) throws UserCodeExecutionException {
- return client.decr(key);
- }
- }
-
- private static class RedisRefresher extends RedisSetupTeardown implements Caller<Instant, Void> {
- private final Quota quota;
- private final String key;
-
- private RedisRefresher(URI uri, Quota quota, String key) {
- super(new RedisClient(uri));
- this.quota = quota;
- this.key = key;
- }
-
- @Override
- public Void call(Instant request) throws UserCodeExecutionException {
- client.setex(key, quota.getNumRequests(), quota.getInterval());
- return null;
- }
- }
-
- private abstract static class RedisSetupTeardown implements SetupTeardown {
- protected final RedisClient client;
-
- private RedisSetupTeardown(RedisClient client) {
- this.client = client;
- }
-
- @Override
- public void setup() throws UserCodeExecutionException {
- client.setup();
- }
-
- @Override
- public void teardown() throws UserCodeExecutionException {
- client.teardown();
- }
- }
-}
diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
deleted file mode 100644
index 0648a86f28e..00000000000
--- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link PCollection} emitting
- * a {@link RequestT} {@link PCollection} at a maximally configured rate, without using an external
- * resource.
- */
-// TODO(damondouglas): expand what "without external resource" means with respect to "with external
-// resource" when the other throttle transforms implemented.
-// See: https://github.com/apache/beam/issues/28932
-class ThrottleWithoutExternalResource<RequestT>
- extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
-
- // TODO(damondouglas): remove suppress warnings when finally utilized in a future PR.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT> configuration;
-
- private ThrottleWithoutExternalResource(Configuration<RequestT> configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public PCollection<RequestT> expand(PCollection<RequestT> input) {
- // TODO(damondouglas): expand in a future PR.
- return input;
- }
-
- @AutoValue
- abstract static class Configuration<RequestT> {
-
- @AutoValue.Builder
- abstract static class Builder<RequestT> {
- abstract Configuration<RequestT> build();
- }
- }
-}
diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
deleted file mode 100644
index 00cee0a4935..00000000000
--- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
-import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-import static org.apache.beam.sdk.values.TypeDescriptors.strings;
-
-import com.google.protobuf.ByteString;
-import java.net.URI;
-import java.util.UUID;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
-import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
-import org.joda.time.Duration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-
-/**
- * Integration tests for {@link ThrottleWithExternalResource}. See {@link EchoITOptions} for details
- * on the required parameters and how to provide these for running integration tests.
- */
-public class ThrottleWithExternalResourceIT {
-
- @Rule public TestPipeline pipeline = TestPipeline.create();
-
- private static final String QUOTA_ID = "echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota";
- private static final Quota QUOTA = new Quota(1L, Duration.standardSeconds(1L));
- private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");
- private static @MonotonicNonNull EchoITOptions options;
- private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
- private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
- private static final Integer PORT = 6379;
- private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder();
- private static final Coder<EchoResponse> RESPONSE_CODER =
- SerializableCoder.of(TypeDescriptor.of(EchoResponse.class));
-
- @Rule
- public GenericContainer<?> redis =
- new GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
-
- @BeforeClass
- public static void setUp() throws UserCodeExecutionException {
- options = readIOTestPipelineOptions(EchoITOptions.class);
- if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
- throw new RuntimeException(
- "--"
- + GRPC_ENDPOINT_ADDRESS_NAME
- + " is missing. See "
- + EchoITOptions.class
- + " for details.");
- }
- client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
- checkStateNotNull(client).setup();
-
- try {
- client.call(createRequest());
- } catch (UserCodeExecutionException e) {
- if (e instanceof UserCodeQuotaException) {
- throw new RuntimeException(
- String.format(
- "The quota: %s is set to refresh on an interval. Unless there are failures in this test, wait for a few seconds before running the test again.",
- QUOTA_ID),
- e);
- }
- throw e;
- }
- }
-
- @AfterClass
- public static void tearDown() throws UserCodeExecutionException {
- checkStateNotNull(client).teardown();
- }
-
- @Test
- public void givenThrottleUsingRedis_preventsQuotaErrors() throws NonDeterministicException {
- URI uri =
- URI.create(String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort()));
- pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
- Result<EchoRequest> throttleResult =
- createRequestStream()
- .apply(
- "throttle",
- ThrottleWithExternalResource.usingRedis(
- uri, QUOTA_ID, UUID.randomUUID().toString(), QUOTA, REQUEST_CODER));
-
- // Assert that we are not getting any errors and successfully emitting throttled elements.
- PAssert.that(throttleResult.getFailures()).empty();
- PAssert.thatSingleton(
- throttleResult
- .getResponses()
- .apply(
- "window throttled", Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
- .apply(
- "count throttled",
- Combine.globally(Count.<EchoRequest>combineFn()).withoutDefaults()))
- .notEqualTo(0L);
-
- // Assert that all the throttled data is not corrupted.
- PAssert.that(
- throttleResult
- .getResponses()
- .apply(
- "window throttled before extraction",
- Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
- .apply(
- "extract request id",
- MapElements.into(strings()).via(input -> checkStateNotNull(input).getId()))
- .apply("distinct", Distinct.create()))
- .containsInAnyOrder(QUOTA_ID);
-
- // Call the Echo service with throttled requests.
- Result<EchoResponse> echoResult =
- throttleResult
- .getResponses()
- .apply("call", Call.ofCallerAndSetupTeardown(client, RESPONSE_CODER));
-
- // Assert that there were no errors.
- PAssert.that(echoResult.getFailures()).empty();
-
- // Assert that the responses match the requests.
- PAssert.that(
- echoResult
- .getResponses()
- .apply(
- "window responses before extraction",
- Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
- .apply(
- "extract response id",
- MapElements.into(strings()).via(input -> checkStateNotNull(input).getId())))
- .containsInAnyOrder(QUOTA_ID);
-
- PipelineResult job = pipeline.run();
- job.waitUntilFinish(Duration.standardSeconds(3L));
- }
-
- private static EchoRequest createRequest() {
- return EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build();
- }
-
- private PCollection<EchoRequest> createRequestStream() {
- return pipeline
- .apply("impulse", PeriodicImpulse.create().withInterval(Duration.millis(10L)))
- .apply(
- "requests",
- MapElements.into(TypeDescriptor.of(EchoRequest.class)).via(ignored -> createRequest()));
- }
-}
diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
deleted file mode 100644
index 591ba923201..00000000000
--- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertThrows;
-
-import java.net.URI;
-import org.apache.beam.io.requestresponse.CallTest.Request;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ThrottleWithExternalResource}. */
-@RunWith(JUnit4.class)
-public class ThrottleWithExternalResourceTest {
- @Rule public TestPipeline pipeline = TestPipeline.create();
-
- @Test
- public void givenNonDeterministicCoder_usingRedis_throwsError() throws NonDeterministicException {
- URI uri = URI.create("redis://localhost:6379");
- String quotaIdentifier = "quota";
- String queueKey = "queue";
- Quota quota = new Quota(10L, Duration.standardSeconds(1L));
-
- assertThrows(
- NonDeterministicException.class,
- () ->
- ThrottleWithExternalResource.usingRedis(
- uri, quotaIdentifier, queueKey, quota, CallTest.NON_DETERMINISTIC_REQUEST_CODER));
-
- ThrottleWithExternalResource.usingRedis(
- uri, quotaIdentifier, queueKey, quota, CallTest.DETERMINISTIC_REQUEST_CODER);
- }
-
- @Test
- public void givenWrongRedisURI_throwsError() throws NonDeterministicException {
- URI uri = URI.create("redis://1.2.3.4:6379");
- String quotaIdentifier = "quota";
- String queueKey = "queue";
- Quota quota = new Quota(10L, Duration.standardSeconds(1L));
- PCollection<Request> requests =
- pipeline.apply(Create.of(new Request(""))).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
- requests.apply(
- ThrottleWithExternalResource.usingRedis(
- uri, quotaIdentifier, queueKey, quota, requests.getCoder()));
-
- UncheckedExecutionException error =
- assertThrows(UncheckedExecutionException.class, pipeline::run);
- assertThat(
- error.getCause().getMessage(),
- containsString("Failed to connect to host: redis://1.2.3.4:6379"));
- }
-}