You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "mosche (via GitHub)" <gi...@apache.org> on 2023/03/20 12:23:10 UTC

[GitHub] [beam] mosche commented on a diff in pull request #23540: Kinesis enhanced fanout

mosche commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1141999895


##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/StaticCheckpointGeneratorTest.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Base64;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class StaticCheckpointGeneratorTest {
+  /**
+   * {@link KinesisSource} serialization changed after some changes in its data.
+   *
+   * <p>Incompatibility caused failures in the following scenario:
+   *
+   * <ol>
+   *   <li>Create Flink savepoint with Beam 2.46.0
+   *   <li>Start from Flink savepoint with Beam 2.47.0-SNAPSHOT in development branch
+   * </ol>
+   *
+   * <p>Neither of {@link CheckpointGenerator} implementations had explicit serialVersionUID, which
+   * caused errors:
+   *
+   * <p><small>Caused by: java.io.InvalidClassException:
+   * org.apache.beam.sdk.io.aws2.kinesis.StaticCheckpointGenerator; local class incompatible: stream
+   * classdesc serialVersionUID = 5972850685627641931, local class serialVersionUID =
+   * -1716374792629517553</small>
+   *
+   * <p><small>Caused by: java.io.InvalidClassException:
+   * org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint; local class incompatible: stream classdesc
+   * serialVersionUID = 103536540299998471, local class serialVersionUID =
+   * 2842489499429532931</small>
+   */
+  @Test
+  public void testCompatibility() throws IOException, ClassNotFoundException {
+    ShardCheckpoint shardCheckpoint =
+        new ShardCheckpoint(
+            "stream-01", "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "42", 12L);
+    KinesisReaderCheckpoint checkpoint =
+        new KinesisReaderCheckpoint(ImmutableList.of(shardCheckpoint));
+    StaticCheckpointGenerator generator = new StaticCheckpointGenerator(checkpoint);
+    String serializedGenerator = serializeObjectToString(generator);
+    // This sequence generated with Beam release 2.46.0 and AdoptOpenJDK-11.0.11+9

Review Comment:
   Please remove the JDK here, that's potentially confusing. What's relevant is that is was generated using the "official" Beam 2.46 artifact published to maven.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/StaticCheckpointGenerator.java:
##########
@@ -19,18 +19,18 @@
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
-/** Always returns the same instance of checkpoint. */
 class StaticCheckpointGenerator implements CheckpointGenerator {
+  private static final long serialVersionUID = 5972850685627641931L;

Review Comment:
   > My fixes included adding serialVersionUID into two classes, and I'm not 100% sure it's actually allowed, can you confirm? 
   
   Not defining a `serialVersionUID` is a poor practice for `Serializable` classes that require it. Unfortunately that's how things started with this IO.  As we might risk changing the default `serialVersionUID` when modifying classes, we should absolutely set it (here, but also for all other classes of this IO that get serialized).
   
   Please make sure to extract it from the latest release artifact (2.46) using the JDK's [serialver](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/serialver.html) tool and document this in a comment.
   
   Note, the resulting `serialVersionUID` is dependent on the class file the compiler produces and not the JDK used as runtime:
   
   > "[I]t is strongly recommended that all serializable classes explicitly declare serialVersionUID values, since the default serialVersionUID computation is highly sensitive to class details that may vary depending on compiler implementations, and can thus result in unexpected InvalidClassExceptions during deserialization. Therefore, to guarantee a consistent serialVersionUID value across different Java compiler implementations, a serializable class must declare an explicit serialVersionUID value."



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/StaticCheckpointGeneratorTest.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Base64;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class StaticCheckpointGeneratorTest {
+  /**
+   * {@link KinesisSource} serialization changed after some changes in its data.
+   *
+   * <p>Incompatibility caused failures in the following scenario:
+   *
+   * <ol>
+   *   <li>Create Flink savepoint with Beam 2.46.0
+   *   <li>Start from Flink savepoint with Beam 2.47.0-SNAPSHOT in development branch
+   * </ol>
+   *
+   * <p>Neither of {@link CheckpointGenerator} implementations had explicit serialVersionUID, which
+   * caused errors:
+   *
+   * <p><small>Caused by: java.io.InvalidClassException:
+   * org.apache.beam.sdk.io.aws2.kinesis.StaticCheckpointGenerator; local class incompatible: stream
+   * classdesc serialVersionUID = 5972850685627641931, local class serialVersionUID =
+   * -1716374792629517553</small>
+   *
+   * <p><small>Caused by: java.io.InvalidClassException:
+   * org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint; local class incompatible: stream classdesc
+   * serialVersionUID = 103536540299998471, local class serialVersionUID =
+   * 2842489499429532931</small>
+   */
+  @Test
+  public void testCompatibility() throws IOException, ClassNotFoundException {

Review Comment:
   > I'm also not sure if it is common practice to keep tests like StaticCheckpointGeneratorTest.testCompatibility(), but it gave me faster code iterations.
   
   That should help catch potential issues. Totally right to add / keep this 👍 
   Would just recommend to rename to `testJavaSerialization` to be a bit more specific



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingCheckpointGenerator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardFilter;
+import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
+
+class ShardListingCheckpointGenerator implements CheckpointGenerator {
+  private static final Logger LOG = LoggerFactory.getLogger(ShardListingCheckpointGenerator.class);
+  private static final int shardListingTimeoutMs = 10_000;
+
+  private final KinesisIO.Read spec;
+
+  public ShardListingCheckpointGenerator(KinesisIO.Read spec) {
+    this.spec = spec;
+  }
+
+  @Override
+  public <ClientT> KinesisReaderCheckpoint generate(ClientT client)
+      throws TransientKinesisException {
+    StartingPoint startingPoint = spec.getInitialPosition();
+
+    if (client instanceof SimplifiedKinesisClient) {
+      SimplifiedKinesisClient kinesis = (SimplifiedKinesisClient) client;
+      List<Shard> streamShards =
+          kinesis.listShardsAtPoint(
+              Preconditions.checkArgumentNotNull(spec.getStreamName()),
+              Preconditions.checkArgumentNotNull(startingPoint));
+
+      LOG.info("Creating a checkpoint with following shards {} at {}", streamShards, startingPoint);
+      return new KinesisReaderCheckpoint(
+          streamShards.stream()
+              .map(
+                  shard ->
+                      new ShardCheckpoint(
+                          Preconditions.checkArgumentNotNull(spec.getStreamName()),
+                          shard.shardId(),
+                          Preconditions.checkArgumentNotNull(startingPoint)))
+              .collect(Collectors.toList()));
+    } else if (client instanceof KinesisAsyncClient) {
+      KinesisAsyncClient kinesis = (KinesisAsyncClient) client;
+      List<ShardCheckpoint> streamShards = generateShardsCheckpoints(spec, kinesis);
+      LOG.info("Creating a checkpoint with following shards {} at {}", streamShards, startingPoint);
+      return new KinesisReaderCheckpoint(streamShards);
+    } else {
+      throw new IllegalStateException(String.format("Unknown type of client %s", client));
+    }
+  }
+
+  private static List<ShardCheckpoint> generateShardsCheckpoints(
+      KinesisIO.Read readSpec, KinesisAsyncClient kinesis) {
+    ListShardsRequest listShardsRequest =
+        ListShardsRequest.builder()
+            .streamName(checkArgumentNotNull(readSpec.getStreamName()))
+            .shardFilter(buildFilter(readSpec))
+            .build();
+
+    ListShardsResponse response = tryListingShards(listShardsRequest, kinesis);
+    return response.shards().stream()
+        .map(
+            s ->
+                new ShardCheckpoint(
+                    checkArgumentNotNull(readSpec.getStreamName()),
+                    s.shardId(),
+                    checkArgumentNotNull(readSpec.getInitialPosition())))
+        .collect(Collectors.toList());
+  }
+
+  private static ListShardsResponse tryListingShards(
+      ListShardsRequest listShardsRequest, KinesisAsyncClient kinesis) {
+    try {
+      LOG.info("Starting ListShardsRequest {}", listShardsRequest);
+      ListShardsResponse response =
+          kinesis.listShards(listShardsRequest).get(shardListingTimeoutMs, TimeUnit.MILLISECONDS);
+      LOG.info("Shards found = {}", response.shards());
+      return response;
+    } catch (ExecutionException | InterruptedException | TimeoutException e) {
+      LOG.error("Error listing shards {}", e.getMessage());

Review Comment:
   Please don't swallow exceptions, either rethrow, wrap (as cause) or log the exception (and not just the message).
   When  testing I misspelled my stream name. This made it unnecessary hard to find the issue.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +150,42 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>EFO can be enabled for one or more {@link KinesisIO.Read} instances via pipeline options:

Review Comment:
   For visibility / discoverability, it might be good to keep `consumerArn` as part of the read spec.
   Whatever is provided there would be the default, unless overwritten by the pipeline option. 
   Thoughts?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/KinesisIOOptions.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.options;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Allows passing modify-able options for {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO}.
+ *
+ * <p>This class is not bound to source only and can have modifiable options for sink, too.
+ *
+ * <p>This class appeared during the implementation of EFO consumer. {@link
+ * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} is serialized with the entire Source object
+ * (at least in Flink runner) which was a trouble for EFO feature design: if consumer ARN is part of
+ * KinesisIO.Read object, when started from a Flink savepoint, consumer ARN string or null value
+ * would be forced from the savepoint. Consequences of this are:
+ *
+ * <ol>
+ *   <li>Once a Kinesis source is started, its consumer ARN can't be changed without loosing state
+ *       (checkpoint-ed shard progress).
+ *   <li>Kinesis source can not have seamless enabling / disabling of EFO feature without loosing
+ *       state (checkpoint-ed shard progress).
+ * </ol>
+ *
+ * <p>This {@link PipelineOptions} extension allows having modifiable configurations for {@link
+ * org.apache.beam.sdk.io.UnboundedSource#split(int, PipelineOptions)} and {@link
+ * org.apache.beam.sdk.io.UnboundedSource#createReader(PipelineOptions,
+ * UnboundedSource.CheckpointMark)}, which is essential for seamless EFO switch on / off.
+ */
+@Experimental(Kind.SOURCE_SINK)
+public interface KinesisIOOptions extends PipelineOptions {

Review Comment:
   This could move into the kinesis package. If kept in the general/shared options package, it might as well use the existing registrar.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/KinesisIOOptions.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.options;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Allows passing modify-able options for {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO}.
+ *
+ * <p>This class is not bound to source only and can have modifiable options for sink, too.
+ *
+ * <p>This class appeared during the implementation of EFO consumer. {@link
+ * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} is serialized with the entire Source object
+ * (at least in Flink runner) which was a trouble for EFO feature design: if consumer ARN is part of
+ * KinesisIO.Read object, when started from a Flink savepoint, consumer ARN string or null value
+ * would be forced from the savepoint. Consequences of this are:
+ *
+ * <ol>
+ *   <li>Once a Kinesis source is started, its consumer ARN can't be changed without loosing state
+ *       (checkpoint-ed shard progress).
+ *   <li>Kinesis source can not have seamless enabling / disabling of EFO feature without loosing
+ *       state (checkpoint-ed shard progress).
+ * </ol>
+ *
+ * <p>This {@link PipelineOptions} extension allows having modifiable configurations for {@link
+ * org.apache.beam.sdk.io.UnboundedSource#split(int, PipelineOptions)} and {@link
+ * org.apache.beam.sdk.io.UnboundedSource#createReader(PipelineOptions,
+ * UnboundedSource.CheckpointMark)}, which is essential for seamless EFO switch on / off.
+ */
+@Experimental(Kind.SOURCE_SINK)
+public interface KinesisIOOptions extends PipelineOptions {
+  /**
+   * {@link KinesisSourceToConsumerMapping} used to enable / disable EFO.
+   *
+   * <p>Example:
+   *
+   * <pre>{@code --kinesisSourceToConsumerMapping={
+   *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+   *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+   *   ...
+   * }}</pre>
+   */
+  @Description("Mapping of streams' names to consumer ARNs of those streams")
+  @Default.InstanceFactory(
+      KinesisSourceToConsumerMapping.KinesisSourceToConsumerMappingFactory.class)
+  KinesisSourceToConsumerMapping getKinesisSourceToConsumerMapping();

Review Comment:
   Pls make sure to prefix with the full IO name. I'd also recommend this includes `consumerArn` somehow as this is the term people are familiar with. E.g. `getKinesisIOConsumerArns()`
   
   Why do you want to use a custom type instead of a map? The abstraction just obfuscates from users that they are dealing with a simple map forcing them to dig into it without any clear benefits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org