You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "mosche (via GitHub)" <gi...@apache.org> on 2023/05/31 09:15:53 UTC

[GitHub] [beam] mosche opened a new pull request, #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

mosche opened a new pull request, #26946:
URL: https://github.com/apache/beam/pull/26946

   This adds an additional `writeBatches` to SqsIO to improve throughput using the batch API with an async client.
   
   The previous KinesisIo `AsyncPutRecordsHandler` became a general handler, so it can also be used for SQS (and others).
   
   (closes #21429)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1259626710


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {
+        throw new IllegalStateException("No SQS client");
+      }
       sqs.sendMessage(processContext.element());
     }
   }
+
+  /**
+   * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
+   * and configuration.
+   */
+  @AutoValue
+  public abstract static class WriteBatches<T>
+      extends PTransform<PCollection<T>, WriteBatches.Result> {
+    private static final int DEFAULT_CONCURRENCY = 5;
+    private static final int MAX_BATCH_SIZE = 10;
+    private static final Duration DEFAULT_BATCH_TIMEOUT = Duration.standardSeconds(3);
+
+    abstract @Pure int concurrentRequests();
+
+    abstract @Pure Duration batchTimeout();
+
+    abstract @Pure int batchSize();
+
+    abstract @Pure ClientConfiguration clientConfiguration();
+
+    abstract @Pure EntryBuilder<T> entryBuilder();
+
+    abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+    abstract @Pure @Nullable String queueUrl();
+
+    abstract Builder<T> builder();
+
+    public interface DynamicDestination<T> extends Serializable {
+      String queueUrl(T message);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> batchTimeout(Duration duration);
+
+      abstract Builder<T> batchSize(int batchSize);
+
+      abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+      abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+      abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T> destination);
+
+      abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+      abstract WriteBatches<T> build();
+    }
+
+    /** Configuration of SQS client. */
+    public WriteBatches<T> withClientConfiguration(ClientConfiguration config) {
+      checkArgument(config != null, "ClientConfiguration cannot be null");
+      return builder().clientConfiguration(config).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle, default is {@code 5}. */
+    public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+      checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /** The batch size to use, default (and AWS limit) is {@code 10}. */
+    public WriteBatches<T> withBatchSize(int batchSize) {
+      checkArgument(
+          batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+          "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+      return builder().batchSize(batchSize).build();
+    }
+
+    /**
+     * The duration to accumulate records before timing out, default is 3 secs.
+     *
+     * <p>Timeouts will be checked upon arrival of new messages.
+     */
+    public WriteBatches<T> withBatchTimeout(Duration timeout) {
+      return builder().batchTimeout(timeout).build();
+    }
+
+    /** Dynamic record based destination to write to. */
+    public WriteBatches<T> to(DynamicDestination<T> destination) {
+      checkArgument(destination != null, "DynamicDestination cannot be null");
+      return builder().queueUrl(null).dynamicDestination(destination).build();
+    }
+
+    /** Queue url to write to. */
+    public WriteBatches<T> to(String queueUrl) {
+      checkArgument(queueUrl != null, "queueUrl cannot be null");
+      return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+    }
+
+    @Override
+    public Result expand(PCollection<T> input) {
+      checkState(dynamicDestination() != null || queueUrl() != null, "to is required");

Review Comment:
   removed this one, the check below validates the exactly same again



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {
+        throw new IllegalStateException("No SQS client");
+      }
       sqs.sendMessage(processContext.element());
     }
   }
+
+  /**
+   * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
+   * and configuration.
+   */
+  @AutoValue
+  public abstract static class WriteBatches<T>
+      extends PTransform<PCollection<T>, WriteBatches.Result> {
+    private static final int DEFAULT_CONCURRENCY = 5;
+    private static final int MAX_BATCH_SIZE = 10;
+    private static final Duration DEFAULT_BATCH_TIMEOUT = Duration.standardSeconds(3);
+
+    abstract @Pure int concurrentRequests();
+
+    abstract @Pure Duration batchTimeout();
+
+    abstract @Pure int batchSize();
+
+    abstract @Pure ClientConfiguration clientConfiguration();
+
+    abstract @Pure EntryBuilder<T> entryBuilder();
+
+    abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+    abstract @Pure @Nullable String queueUrl();
+
+    abstract Builder<T> builder();
+
+    public interface DynamicDestination<T> extends Serializable {
+      String queueUrl(T message);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> batchTimeout(Duration duration);
+
+      abstract Builder<T> batchSize(int batchSize);
+
+      abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+      abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+      abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T> destination);
+
+      abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+      abstract WriteBatches<T> build();
+    }
+
+    /** Configuration of SQS client. */
+    public WriteBatches<T> withClientConfiguration(ClientConfiguration config) {
+      checkArgument(config != null, "ClientConfiguration cannot be null");
+      return builder().clientConfiguration(config).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle, default is {@code 5}. */
+    public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+      checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /** The batch size to use, default (and AWS limit) is {@code 10}. */
+    public WriteBatches<T> withBatchSize(int batchSize) {
+      checkArgument(
+          batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+          "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+      return builder().batchSize(batchSize).build();
+    }
+
+    /**
+     * The duration to accumulate records before timing out, default is 3 secs.
+     *
+     * <p>Timeouts will be checked upon arrival of new messages.
+     */
+    public WriteBatches<T> withBatchTimeout(Duration timeout) {
+      return builder().batchTimeout(timeout).build();
+    }
+
+    /** Dynamic record based destination to write to. */
+    public WriteBatches<T> to(DynamicDestination<T> destination) {
+      checkArgument(destination != null, "DynamicDestination cannot be null");
+      return builder().queueUrl(null).dynamicDestination(destination).build();
+    }
+
+    /** Queue url to write to. */
+    public WriteBatches<T> to(String queueUrl) {
+      checkArgument(queueUrl != null, "queueUrl cannot be null");
+      return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+    }
+
+    @Override
+    public Result expand(PCollection<T> input) {
+      checkState(dynamicDestination() != null || queueUrl() != null, "to is required");
+      AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
+      ClientBuilderFactory.validate(awsOptions, clientConfiguration());
+
+      input.apply(
+          ParDo.of(
+              new DoFn<T, Void>() {
+                private @Nullable BatchHandler<T> handler = null;
+
+                @Setup
+                public void setup(PipelineOptions options) {
+                  handler = new BatchHandler<>(WriteBatches.this, options.as(AwsOptions.class));
+                }
+
+                @StartBundle
+                public void startBundle() {
+                  handler().startBundle();
+                }
+
+                @ProcessElement
+                public void processElement(ProcessContext cxt) throws Throwable {
+                  handler().process(cxt.element());
+                }
+
+                @FinishBundle
+                public void finishBundle() throws Throwable {
+                  handler().finishBundle();
+                }
+
+                @Teardown
+                public void teardown() throws Exception {
+                  if (handler != null) {
+                    handler.close();
+                    handler = null;
+                  }
+                }
+
+                private BatchHandler<T> handler() {
+                  return checkStateNotNull(handler, "SQS handler is null");
+                }
+              }));
+      return new Result(input.getPipeline());
+    }
+
+    /** Batch entry builder. */
+    public interface EntryBuilder<T>
+        extends BiConsumer<SendMessageBatchRequestEntry.Builder, T>, Serializable {}
+
+    /** Result of {@link #writeBatches}. */
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+
+      private Result(Pipeline pipeline) {
+        this.pipeline = pipeline;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of();
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    private static class BatchHandler<T> implements AutoCloseable {
+      private final WriteBatches<T> spec;
+      private final SqsAsyncClient sqs;
+      private final Batches batches;
+      private final AsyncBatchWriteHandler<SendMessageBatchRequestEntry, BatchResultErrorEntry>
+          handler;
+
+      BatchHandler(WriteBatches<T> spec, AwsOptions options) {
+        this.spec = spec;
+        this.sqs = buildClient(options, SqsAsyncClient.builder(), spec.clientConfiguration());
+        this.handler =
+            AsyncBatchWriteHandler.byId(
+                spec.concurrentRequests(),
+                spec.batchSize(),
+                spec.clientConfiguration().retry(),
+                Stats.NONE,
+                (queue, records) -> sendMessageBatch(sqs, queue, records),
+                error -> error.code(),
+                record -> record.id(),
+                error -> error.id());
+        if (spec.queueUrl() != null) {
+          this.batches = new Single(spec.queueUrl());
+        } else if (spec.dynamicDestination() != null) {
+          this.batches = new Dynamic(spec.dynamicDestination());
+        } else {
+          throw new IllegalStateException("queueUrl or dynamicDestination required");

Review Comment:
   👍 



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {
+        throw new IllegalStateException("No SQS client");
+      }
       sqs.sendMessage(processContext.element());
     }
   }
+
+  /**
+   * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
+   * and configuration.
+   */
+  @AutoValue
+  public abstract static class WriteBatches<T>
+      extends PTransform<PCollection<T>, WriteBatches.Result> {
+    private static final int DEFAULT_CONCURRENCY = 5;
+    private static final int MAX_BATCH_SIZE = 10;
+    private static final Duration DEFAULT_BATCH_TIMEOUT = Duration.standardSeconds(3);
+
+    abstract @Pure int concurrentRequests();
+
+    abstract @Pure Duration batchTimeout();
+
+    abstract @Pure int batchSize();
+
+    abstract @Pure ClientConfiguration clientConfiguration();
+
+    abstract @Pure EntryBuilder<T> entryBuilder();
+
+    abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+    abstract @Pure @Nullable String queueUrl();
+
+    abstract Builder<T> builder();
+
+    public interface DynamicDestination<T> extends Serializable {
+      String queueUrl(T message);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> batchTimeout(Duration duration);
+
+      abstract Builder<T> batchSize(int batchSize);
+
+      abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+      abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+      abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T> destination);
+
+      abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+      abstract WriteBatches<T> build();
+    }
+
+    /** Configuration of SQS client. */
+    public WriteBatches<T> withClientConfiguration(ClientConfiguration config) {
+      checkArgument(config != null, "ClientConfiguration cannot be null");
+      return builder().clientConfiguration(config).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle, default is {@code 5}. */
+    public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+      checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /** The batch size to use, default (and AWS limit) is {@code 10}. */
+    public WriteBatches<T> withBatchSize(int batchSize) {
+      checkArgument(
+          batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+          "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+      return builder().batchSize(batchSize).build();
+    }
+
+    /**
+     * The duration to accumulate records before timing out, default is 3 secs.
+     *
+     * <p>Timeouts will be checked upon arrival of new messages.
+     */
+    public WriteBatches<T> withBatchTimeout(Duration timeout) {
+      return builder().batchTimeout(timeout).build();
+    }
+
+    /** Dynamic record based destination to write to. */
+    public WriteBatches<T> to(DynamicDestination<T> destination) {
+      checkArgument(destination != null, "DynamicDestination cannot be null");
+      return builder().queueUrl(null).dynamicDestination(destination).build();
+    }
+
+    /** Queue url to write to. */
+    public WriteBatches<T> to(String queueUrl) {
+      checkArgument(queueUrl != null, "queueUrl cannot be null");
+      return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+    }
+
+    @Override
+    public Result expand(PCollection<T> input) {
+      checkState(dynamicDestination() != null || queueUrl() != null, "to is required");
+      AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
+      ClientBuilderFactory.validate(awsOptions, clientConfiguration());
+
+      input.apply(
+          ParDo.of(
+              new DoFn<T, Void>() {
+                private @Nullable BatchHandler<T> handler = null;
+
+                @Setup
+                public void setup(PipelineOptions options) {
+                  handler = new BatchHandler<>(WriteBatches.this, options.as(AwsOptions.class));
+                }
+
+                @StartBundle
+                public void startBundle() {
+                  handler().startBundle();
+                }
+
+                @ProcessElement
+                public void processElement(ProcessContext cxt) throws Throwable {
+                  handler().process(cxt.element());
+                }
+
+                @FinishBundle
+                public void finishBundle() throws Throwable {
+                  handler().finishBundle();
+                }
+
+                @Teardown
+                public void teardown() throws Exception {
+                  if (handler != null) {
+                    handler.close();
+                    handler = null;
+                  }
+                }
+
+                private BatchHandler<T> handler() {
+                  return checkStateNotNull(handler, "SQS handler is null");
+                }
+              }));
+      return new Result(input.getPipeline());
+    }
+
+    /** Batch entry builder. */
+    public interface EntryBuilder<T>
+        extends BiConsumer<SendMessageBatchRequestEntry.Builder, T>, Serializable {}
+
+    /** Result of {@link #writeBatches}. */
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+
+      private Result(Pipeline pipeline) {
+        this.pipeline = pipeline;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of();
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    private static class BatchHandler<T> implements AutoCloseable {
+      private final WriteBatches<T> spec;
+      private final SqsAsyncClient sqs;
+      private final Batches batches;
+      private final AsyncBatchWriteHandler<SendMessageBatchRequestEntry, BatchResultErrorEntry>
+          handler;
+
+      BatchHandler(WriteBatches<T> spec, AwsOptions options) {
+        this.spec = spec;
+        this.sqs = buildClient(options, SqsAsyncClient.builder(), spec.clientConfiguration());
+        this.handler =
+            AsyncBatchWriteHandler.byId(
+                spec.concurrentRequests(),
+                spec.batchSize(),
+                spec.clientConfiguration().retry(),
+                Stats.NONE,
+                (queue, records) -> sendMessageBatch(sqs, queue, records),
+                error -> error.code(),
+                record -> record.id(),
+                error -> error.id());
+        if (spec.queueUrl() != null) {
+          this.batches = new Single(spec.queueUrl());
+        } else if (spec.dynamicDestination() != null) {
+          this.batches = new Dynamic(spec.dynamicDestination());
+        } else {
+          throw new IllegalStateException("queueUrl or dynamicDestination required");

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1223592159


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.common;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates.notNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.DateTimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async handler that automatically retries unprocessed records in case of a partial success.
+ *
+ * <p>The handler enforces the provided upper limit of concurrent requests. Once that limit is
+ * reached any further call to {@link #batchWrite(String, List)} will block until another request
+ * completed.
+ *
+ * <p>The handler is fail fast and won't submit any further request after a failure. Async failures
+ * can be polled using {@link #checkForAsyncFailure()}.
+ *
+ * @param <RecT> Record type in batch
+ * @param <ResT> Potentially erroneous result that needs to be correlated to a record using {@link
+ *     #failedRecords(List, List)}
+ */
+@NotThreadSafe
+@Internal
+public abstract class AsyncBatchWriteHandler<RecT, ResT> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchWriteHandler.class);
+  private final FluentBackoff backoff;
+  private final int concurrentRequests;
+  private final Stats stats;
+  protected final BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> submitFn;
+  protected final Function<ResT, String> errorCodeFn;
+
+  private AtomicBoolean hasErrored;
+  private AtomicReference<Throwable> asyncFailure;
+  private Semaphore pendingRequests;

Review Comment:
   👍 requestsInProgress is much better, thx ... completion is pending, but not the request:
   ``` 
     public final int requestsInProgress() {
       return concurrentRequests - requestPermits.availablePermits();
     }
   ```
   The semaphore itself was renamed to `requestPermits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1259613175


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {

Review Comment:
   It's a transient field, so theoretically it can happen and checker complains about it unless checked. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26946:
URL: https://github.com/apache/beam/pull/26946#issuecomment-1569815709

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26946:
URL: https://github.com/apache/beam/pull/26946#issuecomment-1578921538

   Thanks @psolomin, good catch. That shouldn't happen, I've fixed it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26946:
URL: https://github.com/apache/beam/pull/26946#issuecomment-1630693082

   > Also, I was a bit confused to see the changes of KinesisIO as well, despite the fact that this PR titles about only SqsIO. Never mind but it would be more clear to split such PR into two ones or properly name it in the future.
   
   The changes on the Kinesis side are just to extract some common base functionality, but nothing functional. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] psolomin commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1222576608


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -1328,7 +1332,7 @@ void addClientRecord(int recordBytes) {
       }
 
       @Override
-      public void addPutRecordsRequest(long latencyMillis, boolean isPartialRetry) {
+      public void addBatchWriteRequest(long latencyMillis, boolean isPartialRetry) {

Review Comment:
   [side note] While reading this PR, I've noticed:
   
   ```
   ... Concurrency settings above the default have caused a bug in the AWS SDK v2. ...
   public Write<T> withConcurrentRequests(int concurrentRequests)
   ```
   Does it still hold after the recent bump of AWS SDK version? Should we create an issue for Beam to expose this setting once the AWS SDK is fixed?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.common;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates.notNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.DateTimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async handler that automatically retries unprocessed records in case of a partial success.
+ *
+ * <p>The handler enforces the provided upper limit of concurrent requests. Once that limit is
+ * reached any further call to {@link #batchWrite(String, List)} will block until another request
+ * completed.
+ *
+ * <p>The handler is fail fast and won't submit any further request after a failure. Async failures
+ * can be polled using {@link #checkForAsyncFailure()}.
+ *
+ * @param <RecT> Record type in batch
+ * @param <ResT> Potentially erroneous result that needs to be correlated to a record using {@link
+ *     #failedRecords(List, List)}
+ */
+@NotThreadSafe
+@Internal
+public abstract class AsyncBatchWriteHandler<RecT, ResT> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchWriteHandler.class);
+  private final FluentBackoff backoff;
+  private final int concurrentRequests;
+  private final Stats stats;
+  protected final BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> submitFn;
+  protected final Function<ResT, String> errorCodeFn;
+
+  private AtomicBoolean hasErrored;
+  private AtomicReference<Throwable> asyncFailure;
+  private Semaphore pendingRequests;

Review Comment:
   [nitpick] Is it really `pending`? Looks like `requestsInProgress` would better reflect its meaning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26946:
URL: https://github.com/apache/beam/pull/26946#issuecomment-1630665370

   > I wonder though, should we keep the existing write() then, long term? Or make it as deprecated and eventually drop?
   
   @psolomin I'm still wondering / unsure how to proceed. I was also considering to re-implement write using write batches internally...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1254628135


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {

Review Comment:
   Is it possible? How it may happen? 



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {
+        throw new IllegalStateException("No SQS client");
+      }
       sqs.sendMessage(processContext.element());
     }
   }
+
+  /**
+   * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
+   * and configuration.
+   */
+  @AutoValue
+  public abstract static class WriteBatches<T>
+      extends PTransform<PCollection<T>, WriteBatches.Result> {
+    private static final int DEFAULT_CONCURRENCY = 5;
+    private static final int MAX_BATCH_SIZE = 10;
+    private static final Duration DEFAULT_BATCH_TIMEOUT = Duration.standardSeconds(3);
+
+    abstract @Pure int concurrentRequests();
+
+    abstract @Pure Duration batchTimeout();
+
+    abstract @Pure int batchSize();
+
+    abstract @Pure ClientConfiguration clientConfiguration();
+
+    abstract @Pure EntryBuilder<T> entryBuilder();
+
+    abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+    abstract @Pure @Nullable String queueUrl();
+
+    abstract Builder<T> builder();
+
+    public interface DynamicDestination<T> extends Serializable {
+      String queueUrl(T message);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> batchTimeout(Duration duration);
+
+      abstract Builder<T> batchSize(int batchSize);
+
+      abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+      abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+      abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T> destination);
+
+      abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+      abstract WriteBatches<T> build();
+    }
+
+    /** Configuration of SQS client. */
+    public WriteBatches<T> withClientConfiguration(ClientConfiguration config) {
+      checkArgument(config != null, "ClientConfiguration cannot be null");
+      return builder().clientConfiguration(config).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle, default is {@code 5}. */
+    public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+      checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /** The batch size to use, default (and AWS limit) is {@code 10}. */
+    public WriteBatches<T> withBatchSize(int batchSize) {
+      checkArgument(
+          batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+          "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+      return builder().batchSize(batchSize).build();
+    }
+
+    /**
+     * The duration to accumulate records before timing out, default is 3 secs.
+     *
+     * <p>Timeouts will be checked upon arrival of new messages.
+     */
+    public WriteBatches<T> withBatchTimeout(Duration timeout) {
+      return builder().batchTimeout(timeout).build();
+    }
+
+    /** Dynamic record based destination to write to. */
+    public WriteBatches<T> to(DynamicDestination<T> destination) {
+      checkArgument(destination != null, "DynamicDestination cannot be null");
+      return builder().queueUrl(null).dynamicDestination(destination).build();
+    }
+
+    /** Queue url to write to. */
+    public WriteBatches<T> to(String queueUrl) {
+      checkArgument(queueUrl != null, "queueUrl cannot be null");
+      return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+    }
+
+    @Override
+    public Result expand(PCollection<T> input) {
+      checkState(dynamicDestination() != null || queueUrl() != null, "to is required");

Review Comment:
   nit: `WriteBatches.to() is required`



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception {
+      if (sqs == null) {
+        throw new IllegalStateException("No SQS client");
+      }
       sqs.sendMessage(processContext.element());
     }
   }
+
+  /**
+   * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
+   * and configuration.
+   */
+  @AutoValue
+  public abstract static class WriteBatches<T>
+      extends PTransform<PCollection<T>, WriteBatches.Result> {
+    private static final int DEFAULT_CONCURRENCY = 5;
+    private static final int MAX_BATCH_SIZE = 10;
+    private static final Duration DEFAULT_BATCH_TIMEOUT = Duration.standardSeconds(3);
+
+    abstract @Pure int concurrentRequests();
+
+    abstract @Pure Duration batchTimeout();
+
+    abstract @Pure int batchSize();
+
+    abstract @Pure ClientConfiguration clientConfiguration();
+
+    abstract @Pure EntryBuilder<T> entryBuilder();
+
+    abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+    abstract @Pure @Nullable String queueUrl();
+
+    abstract Builder<T> builder();
+
+    public interface DynamicDestination<T> extends Serializable {
+      String queueUrl(T message);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> batchTimeout(Duration duration);
+
+      abstract Builder<T> batchSize(int batchSize);
+
+      abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+      abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+      abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T> destination);
+
+      abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+      abstract WriteBatches<T> build();
+    }
+
+    /** Configuration of SQS client. */
+    public WriteBatches<T> withClientConfiguration(ClientConfiguration config) {
+      checkArgument(config != null, "ClientConfiguration cannot be null");
+      return builder().clientConfiguration(config).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle, default is {@code 5}. */
+    public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+      checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /** The batch size to use, default (and AWS limit) is {@code 10}. */
+    public WriteBatches<T> withBatchSize(int batchSize) {
+      checkArgument(
+          batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+          "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+      return builder().batchSize(batchSize).build();
+    }
+
+    /**
+     * The duration to accumulate records before timing out, default is 3 secs.
+     *
+     * <p>Timeouts will be checked upon arrival of new messages.
+     */
+    public WriteBatches<T> withBatchTimeout(Duration timeout) {
+      return builder().batchTimeout(timeout).build();
+    }
+
+    /** Dynamic record based destination to write to. */
+    public WriteBatches<T> to(DynamicDestination<T> destination) {
+      checkArgument(destination != null, "DynamicDestination cannot be null");
+      return builder().queueUrl(null).dynamicDestination(destination).build();
+    }
+
+    /** Queue url to write to. */
+    public WriteBatches<T> to(String queueUrl) {
+      checkArgument(queueUrl != null, "queueUrl cannot be null");
+      return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+    }
+
+    @Override
+    public Result expand(PCollection<T> input) {
+      checkState(dynamicDestination() != null || queueUrl() != null, "to is required");
+      AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
+      ClientBuilderFactory.validate(awsOptions, clientConfiguration());
+
+      input.apply(
+          ParDo.of(
+              new DoFn<T, Void>() {
+                private @Nullable BatchHandler<T> handler = null;
+
+                @Setup
+                public void setup(PipelineOptions options) {
+                  handler = new BatchHandler<>(WriteBatches.this, options.as(AwsOptions.class));
+                }
+
+                @StartBundle
+                public void startBundle() {
+                  handler().startBundle();
+                }
+
+                @ProcessElement
+                public void processElement(ProcessContext cxt) throws Throwable {
+                  handler().process(cxt.element());
+                }
+
+                @FinishBundle
+                public void finishBundle() throws Throwable {
+                  handler().finishBundle();
+                }
+
+                @Teardown
+                public void teardown() throws Exception {
+                  if (handler != null) {
+                    handler.close();
+                    handler = null;
+                  }
+                }
+
+                private BatchHandler<T> handler() {
+                  return checkStateNotNull(handler, "SQS handler is null");
+                }
+              }));
+      return new Result(input.getPipeline());
+    }
+
+    /** Batch entry builder. */
+    public interface EntryBuilder<T>
+        extends BiConsumer<SendMessageBatchRequestEntry.Builder, T>, Serializable {}
+
+    /** Result of {@link #writeBatches}. */
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+
+      private Result(Pipeline pipeline) {
+        this.pipeline = pipeline;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of();
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    private static class BatchHandler<T> implements AutoCloseable {
+      private final WriteBatches<T> spec;
+      private final SqsAsyncClient sqs;
+      private final Batches batches;
+      private final AsyncBatchWriteHandler<SendMessageBatchRequestEntry, BatchResultErrorEntry>
+          handler;
+
+      BatchHandler(WriteBatches<T> spec, AwsOptions options) {
+        this.spec = spec;
+        this.sqs = buildClient(options, SqsAsyncClient.builder(), spec.clientConfiguration());
+        this.handler =
+            AsyncBatchWriteHandler.byId(
+                spec.concurrentRequests(),
+                spec.batchSize(),
+                spec.clientConfiguration().retry(),
+                Stats.NONE,
+                (queue, records) -> sendMessageBatch(sqs, queue, records),
+                error -> error.code(),
+                record -> record.id(),
+                error -> error.id());
+        if (spec.queueUrl() != null) {
+          this.batches = new Single(spec.queueUrl());
+        } else if (spec.dynamicDestination() != null) {
+          this.batches = new Dynamic(spec.dynamicDestination());
+        } else {
+          throw new IllegalStateException("queueUrl or dynamicDestination required");

Review Comment:
   nit: `... is required`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26946:
URL: https://github.com/apache/beam/pull/26946#issuecomment-1569813110

   R: @aromanenko-dev 
   R: @psolomin 
   
   No rush on this one, I'll be off for quite a while mid next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche merged pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche merged PR #26946:
URL: https://github.com/apache/beam/pull/26946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1223597027


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -1328,7 +1332,7 @@ void addClientRecord(int recordBytes) {
       }
 
       @Override
-      public void addPutRecordsRequest(long latencyMillis, boolean isPartialRetry) {
+      public void addBatchWriteRequest(long latencyMillis, boolean isPartialRetry) {

Review Comment:
   Actually not sure about this at the moment. I'll make a note and will have look into this later when back from my pto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26946: [AWS Sqs] Add SqsIO.writeBatches for improved performance

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1223592159


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.common;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates.notNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.DateTimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async handler that automatically retries unprocessed records in case of a partial success.
+ *
+ * <p>The handler enforces the provided upper limit of concurrent requests. Once that limit is
+ * reached any further call to {@link #batchWrite(String, List)} will block until another request
+ * completed.
+ *
+ * <p>The handler is fail fast and won't submit any further request after a failure. Async failures
+ * can be polled using {@link #checkForAsyncFailure()}.
+ *
+ * @param <RecT> Record type in batch
+ * @param <ResT> Potentially erroneous result that needs to be correlated to a record using {@link
+ *     #failedRecords(List, List)}
+ */
+@NotThreadSafe
+@Internal
+public abstract class AsyncBatchWriteHandler<RecT, ResT> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchWriteHandler.class);
+  private final FluentBackoff backoff;
+  private final int concurrentRequests;
+  private final Stats stats;
+  protected final BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> submitFn;
+  protected final Function<ResT, String> errorCodeFn;
+
+  private AtomicBoolean hasErrored;
+  private AtomicReference<Throwable> asyncFailure;
+  private Semaphore pendingRequests;

Review Comment:
   👍 requestsInProgress is much better, thx ... completion is pending, but not the request



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org