You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/17 17:01:34 UTC

[GitHub] [beam] mosche opened a new pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

mosche opened a new pull request #17113:
URL: https://github.com/apache/beam/pull/17113


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r830838124



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -947,25 +1003,37 @@ private void validateExplicitHashKey(String hashKey) {
      * with KCL to correctly implement the binary protocol, specifically {@link
      * software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord}.
      *
-     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. While the KPL is aware
-     * of effective hash key ranges assigned to each shard, we're not and don't want to be to keep
-     * complexity manageable and avoid the risk of silently loosing records in the KCL:
+     * <p>To aggregate records the best possible way, records are assigned an explicit hash key that
+     * corresponds to the lower bound of the hash key range of the target shard. In case a record
+     * has already an explicit hash key assigned, it is kept unchanged.
      *
-     * <p>{@link software.amazon.kinesis.retrieval.AggregatorUtil#deaggregate(List, BigInteger,
-     * BigInteger)} drops records not matching the expected hash key range.
+     * <p>Hash key ranges of shards are expected to be only slowly changing and get refreshed
+     * infrequently. If using an {@link ExplicitPartitioner} or disabling shard refresh via {@link
+     * RecordAggregation}, no shard details will be pulled.
      */
     static class AggregatedWriter<T> extends Writer<T> {
       private static final Logger LOG = LoggerFactory.getLogger(AggregatedWriter.class);
+      private static final ObjectPool<String, ShardRanges> SHARDRANGES_BY_STREAM =
+          new ObjectPool<>(ShardRanges::of);
 
       private final RecordAggregation aggSpec;
       private final Map<BigInteger, RecordsAggregator> aggregators;
-      private final MessageDigest md5Digest;
+      private final PartitionKeyHasher pkHasher;
+
+      private final ShardRanges shardRanges;
 
       AggregatedWriter(PipelineOptions options, Write<T> spec, RecordAggregation aggSpec) {
         super(options, spec);
         this.aggSpec = aggSpec;
-        this.aggregators = new LinkedHashMap<>();
-        this.md5Digest = md5Digest();
+        aggregators = new LinkedHashMap<>();
+        pkHasher = new PartitionKeyHasher();
+        if (aggSpec.shardRefreshInterval().isLongerThan(Duration.ZERO)
+            && !(spec.partitioner() instanceof ExplicitPartitioner)) {
+          shardRanges = SHARDRANGES_BY_STREAM.retain(spec.streamName());
+          shardRanges.refreshPeriodically(kinesis, aggSpec::nextShardRefresh);
+        } else {
+          shardRanges = ShardRanges.EMPTY;

Review comment:
       Noop shardranges if using `ExplicitPartitioner` or if refresh is disabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1071130178


   R: @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1073569517


   @aromanenko-dev Not sure if feasible in terms of reviewing, though would be great to get this into the 2.38 release as well :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1071130178


   R: @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1073869187


   Same BigQuery it tests failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r830838124



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -947,25 +1003,37 @@ private void validateExplicitHashKey(String hashKey) {
      * with KCL to correctly implement the binary protocol, specifically {@link
      * software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord}.
      *
-     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. While the KPL is aware
-     * of effective hash key ranges assigned to each shard, we're not and don't want to be to keep
-     * complexity manageable and avoid the risk of silently loosing records in the KCL:
+     * <p>To aggregate records the best possible way, records are assigned an explicit hash key that
+     * corresponds to the lower bound of the hash key range of the target shard. In case a record
+     * has already an explicit hash key assigned, it is kept unchanged.
      *
-     * <p>{@link software.amazon.kinesis.retrieval.AggregatorUtil#deaggregate(List, BigInteger,
-     * BigInteger)} drops records not matching the expected hash key range.
+     * <p>Hash key ranges of shards are expected to be only slowly changing and get refreshed
+     * infrequently. If using an {@link ExplicitPartitioner} or disabling shard refresh via {@link
+     * RecordAggregation}, no shard details will be pulled.
      */
     static class AggregatedWriter<T> extends Writer<T> {
       private static final Logger LOG = LoggerFactory.getLogger(AggregatedWriter.class);
+      private static final ObjectPool<String, ShardRanges> SHARDRANGES_BY_STREAM =
+          new ObjectPool<>(ShardRanges::of);
 
       private final RecordAggregation aggSpec;
       private final Map<BigInteger, RecordsAggregator> aggregators;
-      private final MessageDigest md5Digest;
+      private final PartitionKeyHasher pkHasher;
+
+      private final ShardRanges shardRanges;
 
       AggregatedWriter(PipelineOptions options, Write<T> spec, RecordAggregation aggSpec) {
         super(options, spec);
         this.aggSpec = aggSpec;
-        this.aggregators = new LinkedHashMap<>();
-        this.md5Digest = md5Digest();
+        aggregators = new LinkedHashMap<>();
+        pkHasher = new PartitionKeyHasher();
+        if (aggSpec.shardRefreshInterval().isLongerThan(Duration.ZERO)
+            && !(spec.partitioner() instanceof ExplicitPartitioner)) {
+          shardRanges = SHARDRANGES_BY_STREAM.retain(spec.streamName());
+          shardRanges.refreshPeriodically(kinesis, aggSpec::nextShardRefresh);
+        } else {
+          shardRanges = ShardRanges.EMPTY;

Review comment:
       Noop shardranges if using `ExplicitPartitioner` or if refresh is disabled. In that case shard aware aggregation won't be available




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r830839450



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -977,20 +1045,36 @@ public void startBundle() {
       @Override
       protected void write(String partitionKey, @Nullable String explicitHashKey, byte[] data)
           throws Throwable {
-        BigInteger hashKey = effectiveHashKey(partitionKey, explicitHashKey);

Review comment:
       All relevant changes are here until end of KinesisIO




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r830837582



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientPool.java
##########
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.common;
-
-import java.util.function.BiFunction;
-import org.apache.beam.sdk.io.aws2.options.AwsOptions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap;
-import org.apache.commons.lang3.tuple.Pair;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
-
-/**
- * Reference counting pool to easily share AWS clients or similar by individual client provider and
- * configuration (optional).
- *
- * <p>NOTE: This relies heavily on the implementation of {@link #equals(Object)} for {@link
- * ProviderT} and {@link ConfigT}. If not implemented properly, clients can't be shared between
- * instances of {@link org.apache.beam.sdk.transforms.DoFn}.
- *
- * @param <ProviderT> Client provider
- * @param <ConfigT> Optional, nullable configuration
- * @param <ClientT> Client
- */
-public class ClientPool<ProviderT, ConfigT, ClientT extends AutoCloseable> {

Review comment:
       More or less identical to `ObjectPool`, sadly it's presented as removal / addition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1073731729


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #17113:
URL: https://github.com/apache/beam/pull/17113#issuecomment-1073781410


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org