You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/06 16:51:39 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #17113: [BEAM-14104] Support shard aware aggregation in Kinesis writer.

aromanenko-dev commented on code in PR #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r844150744


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -947,25 +1003,37 @@ private void validateExplicitHashKey(String hashKey) {
      * with KCL to correctly implement the binary protocol, specifically {@link
      * software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord}.
      *
-     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. While the KPL is aware
-     * of effective hash key ranges assigned to each shard, we're not and don't want to be to keep
-     * complexity manageable and avoid the risk of silently loosing records in the KCL:
+     * <p>To aggregate records the best possible way, records are assigned an explicit hash key that
+     * corresponds to the lower bound of the hash key range of the target shard. In case a record
+     * has already an explicit hash key assigned, it is kept unchanged.
      *
-     * <p>{@link software.amazon.kinesis.retrieval.AggregatorUtil#deaggregate(List, BigInteger,
-     * BigInteger)} drops records not matching the expected hash key range.
+     * <p>Hash key ranges of shards are expected to be only slowly changing and get refreshed
+     * infrequently. If using an {@link ExplicitPartitioner} or disabling shard refresh via {@link
+     * RecordAggregation}, no shard details will be pulled.
      */
     static class AggregatedWriter<T> extends Writer<T> {
       private static final Logger LOG = LoggerFactory.getLogger(AggregatedWriter.class);
+      private static final ObjectPool<String, ShardRanges> SHARDRANGES_BY_STREAM =

Review Comment:
   nit: `SHARD_RANGES_BY_STREAM `



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ObjectPool.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.common;
+
+import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.buildClient;
+
+import java.util.function.Function;
+import org.apache.beam.sdk.function.ThrowingConsumer;
+import org.apache.beam.sdk.io.aws2.options.AwsOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+
+/**
+ * Reference counting object pool to easily share & destroy objects.
+ *
+ * <p>NOTE: This relies heavily on the implementation of {@link #equals(Object)} for {@link KeyT}.
+ * If not implemented properly, clients can't be shared between instances of {@link
+ * org.apache.beam.sdk.transforms.DoFn}.
+ *
+ * @param <KeyT>> Key to share objects by
+ * @param <ObjectT>> Shared object
+ */
+public class ObjectPool<KeyT extends @NonNull Object, ObjectT extends @NonNull Object> {

Review Comment:
   Should this class and its methods be `public` or just package private?? 



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner.java:
##########
@@ -47,6 +47,26 @@
     return null;
   }
 
+  /**
+   * An explicit partitioner that always returns a {@code Nonnull} explicit hash key. The partition
+   * key is irrelevant in this case, though it cannot be {@code null}.
+   */
+  interface ExplicitPartitioner<T> extends KinesisPartitioner<T> {
+    @Override
+    default @Nonnull String getPartitionKey(T record) {
+      return "a"; // will be ignored, but can't be null

Review Comment:
   Return just empty string?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientPool.java:
##########
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.common;
-
-import java.util.function.BiFunction;
-import org.apache.beam.sdk.io.aws2.options.AwsOptions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap;
-import org.apache.commons.lang3.tuple.Pair;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
-
-/**
- * Reference counting pool to easily share AWS clients or similar by individual client provider and
- * configuration (optional).
- *
- * <p>NOTE: This relies heavily on the implementation of {@link #equals(Object)} for {@link
- * ProviderT} and {@link ConfigT}. If not implemented properly, clients can't be shared between
- * instances of {@link org.apache.beam.sdk.transforms.DoFn}.
- *
- * @param <ProviderT> Client provider
- * @param <ConfigT> Optional, nullable configuration
- * @param <ClientT> Client
- */
-public class ClientPool<ProviderT, ConfigT, ClientT extends AutoCloseable> {

Review Comment:
   And it was `public` just by chance or explicitly? Can it break something for users?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/RetryConfiguration.java:
##########
@@ -68,7 +68,7 @@
   public abstract RetryConfiguration.Builder toBuilder();
 
   public static Builder builder() {
-    return Builder.builder();
+    return Builder.builder().numRetries(3);

Review Comment:
   Why number of retries "3" was choosen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org