You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vahmed-hamdy (via GitHub)" <gi...@apache.org> on 2023/04/26 08:08:20 UTC

[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

vahmed-hamdy commented on code in PR #49:
URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1177447977


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+@PublicEvolving
+public class KinesisStreamsSource<T>
+        implements Source<T, KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
+
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsSource(
+            String streamArn,
+            Properties consumerConfig,
+            DeserializationSchema<T> deserializationSchema) {
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext readerContext)
+            throws Exception {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        KinesisStreamProxy kinesisStreamProxy = createKinesisStreamProxy(consumerConfig);
+        Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
+                () -> new PollingKinesisShardSplitReader(kinesisStreamProxy);
+        KinesisStreamsRecordEmitter<T> recordEmitter =
+                new KinesisStreamsRecordEmitter<>(deserializationSchema);
+
+        return new KinesisStreamsSourceReader<>(
+                elementsQueue,
+                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get),
+                recordEmitter,
+                toConfiguration(consumerConfig),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> createEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> enumContext) throws Exception {
+        return restoreEnumerator(enumContext, null);
+    }
+
+    @Override
+    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState>
+            restoreEnumerator(
+                    SplitEnumeratorContext<KinesisShardSplit> enumContext,
+                    KinesisStreamsSourceEnumeratorState checkpoint)
+                    throws Exception {
+        return new KinesisStreamsSourceEnumerator(
+                enumContext,
+                streamArn,
+                consumerConfig,
+                createKinesisStreamProxy(consumerConfig),
+                checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
+        return new KinesisShardSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
+    }
+
+    private Configuration toConfiguration(Properties props) {
+        Configuration config = new Configuration();
+        props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key)));
+        return config;
+    }
+
+    private KinesisStreamProxy createKinesisStreamProxy(Properties consumerConfig) {
+        SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        AttributeMap.builder().build(), ApacheHttpClient.builder());
+
+        AWSGeneralUtil.validateAwsCredentials(consumerConfig);

Review Comment:
   Should we add a test for this validation?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+@PublicEvolving
+public class KinesisStreamsSourceEnumerator
+    implements SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new HashMap<>();
+    private final Set<KinesisShardSplit> assignedSplits = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedShards;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+        SplitEnumeratorContext<KinesisShardSplit> context,
+        String streamArn,
+        Properties consumerConfig,
+        StreamProxy streamProxy,
+        KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = new HashShardAssigner();
+        this.shardAssignerContext =
+            new ShardAssignerContext(splitAssignment, context.registeredReaders());
+        if (state == null) {
+            this.completedShards = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();

Review Comment:
   nit: can use Optional



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {
+    public enum InitialPosition {
+        LATEST,
+        TRIM_HORIZON,
+        AT_TIMESTAMP
+    }
+
+    /** The record publisher type represents the record-consume style. */
+    public enum RecordPublisherType {
+
+        /** Consume the Kinesis records using AWS SDK v2 with the enhanced fan-out consumer. */
+        EFO,
+        /** Consume the Kinesis records using AWS SDK v1 with the get-records method. */
+        POLLING
+    }
+
+    /** The EFO registration type represents how we are going to de-/register efo consumer. */
+    public enum EFORegistrationType {
+
+        /**
+         * Delay the registration of efo consumer for taskmanager to execute. De-register the efo
+         * consumer for taskmanager to execute when task is shut down.
+         */
+        LAZY,
+        /**
+         * Register the efo consumer eagerly for jobmanager to execute. De-register the efo consumer
+         * the same way as lazy does.
+         */
+        EAGER,
+        /** Do not register efo consumer programmatically. Do not de-register either. */
+        NONE
+    }
+
+    /** The RecordPublisher type (EFO|POLLING). */
+    public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
+
+    public static final String DEFAULT_RECORD_PUBLISHER_TYPE =
+            RecordPublisherType.POLLING.toString();
+
+    /** Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). */
+    public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
+
+    public static final String DEFAULT_EFO_REGISTRATION_TYPE = EFORegistrationType.EAGER.toString();
+
+    /** The name of the EFO consumer to register with KDS. */
+    public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
+
+    /** The prefix of consumer ARN for a given stream. */
+    public static final String EFO_CONSUMER_ARN_PREFIX = "flink.stream.efo.consumerarn";
+
+    /** The initial position to start reading Kinesis streams from. */
+    public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
+
+    public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
+
+    /**
+     * The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for
+     * STREAM_INITIAL_POSITION).
+     */
+    public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
+
+    /**
+     * The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP
+     * is set for STREAM_INITIAL_POSITION).
+     */
+    public static final String STREAM_TIMESTAMP_DATE_FORMAT =
+            "flink.stream.initpos.timestamp.format";
+
+    public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
+            "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
+    /** The maximum number of describeStream attempts if we get a recoverable exception. */
+    public static final String STREAM_DESCRIBE_RETRIES = "flink.stream.describe.maxretries";
+
+    public static final int DEFAULT_STREAM_DESCRIBE_RETRIES = 50;
+
+    /** The base backoff time between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
+
+    public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 2000L;
+
+    /** The maximum backoff time between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
+
+    public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.describe.backoff.expconst";
+
+    public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of listShards attempts if we get a recoverable exception. */
+    public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";
+
+    public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
+    /** The base backoff time between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
+
+    public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
+
+    /** The maximum backoff time between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_MAX = "flink.list.shards.backoff.max";
+
+    public static final long DEFAULT_LIST_SHARDS_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.list.shards.backoff.expconst";
+
+    public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of describeStreamConsumer attempts if we get a recoverable exception. */
+    public static final String DESCRIBE_STREAM_CONSUMER_RETRIES =
+            "flink.stream.describestreamconsumer.maxretries";
+
+    public static final int DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES = 50;
+
+    /** The base backoff time between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE =
+            "flink.stream.describestreamconsumer.backoff.base";
+
+    public static final long DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE = 2000L;
+
+    /** The maximum backoff time between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX =
+            "flink.stream.describestreamconsumer.backoff.max";
+
+    public static final long DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.describestreamconsumer.backoff.expconst";
+
+    public static final double DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of registerStreamConsumer attempts if we get a recoverable exception. */
+    public static final String REGISTER_STREAM_CONSUMER_RETRIES =
+            "flink.stream.registerstreamconsumer.maxretries";
+
+    public static final int DEFAULT_REGISTER_STREAM_CONSUMER_RETRIES = 10;
+
+    /**
+     * The maximum time in seconds to wait for a stream consumer to become active before giving up.
+     */
+    public static final String REGISTER_STREAM_CONSUMER_TIMEOUT_SECONDS =
+            "flink.stream.registerstreamconsumer.timeout";
+
+    public static final Duration DEFAULT_REGISTER_STREAM_CONSUMER_TIMEOUT = Duration.ofSeconds(60);
+
+    /** The base backoff time between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_CONSUMER_BACKOFF_BASE =
+            "flink.stream.registerstreamconsumer.backoff.base";
+
+    public static final long DEFAULT_REGISTER_STREAM_CONSUMER_BACKOFF_BASE = 500L;
+
+    /** The maximum backoff time between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_CONSUMER_BACKOFF_MAX =
+            "flink.stream.registerstreamconsumer.backoff.max";
+
+    public static final long DEFAULT_REGISTER_STREAM_CONSUMER_BACKOFF_MAX = 2000L;
+
+    /** The power constant for exponential backoff between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.registerstreamconsumer.backoff.expconst";
+
+    public static final double DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /**
+     * The maximum number of deregisterStreamConsumer attempts if we get a recoverable exception.
+     */
+    public static final String DEREGISTER_STREAM_CONSUMER_RETRIES =
+            "flink.stream.deregisterstreamconsumer.maxretries";
+
+    public static final int DEFAULT_DEREGISTER_STREAM_CONSUMER_RETRIES = 10;
+
+    /** The maximum time in seconds to wait for a stream consumer to deregister before giving up. */
+    public static final String DEREGISTER_STREAM_CONSUMER_TIMEOUT_SECONDS =
+            "flink.stream.deregisterstreamconsumer.timeout";
+
+    public static final Duration DEFAULT_DEREGISTER_STREAM_CONSUMER_TIMEOUT =
+            Duration.ofSeconds(60);
+
+    /** The base backoff time between each deregisterStreamConsumer attempt. */
+    public static final String DEREGISTER_STREAM_BACKOFF_BASE =

Review Comment:
   @darenwkt I think we need to maintain backwarc compatibility here. let's keep it similar to 
   https://github.com/apache/flink-connector-aws/blob/17820d062007a4fcdf24c3b974c636f32705cd27/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java#L203



##########
flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java:
##########
@@ -0,0 +1,381 @@
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants;
+import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.util.TestUtil;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy;
+import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+class KinesisStreamsSourceEnumeratorTest {
+
+    private static final int NUM_SUBTASKS = 5;
+    private static final String STREAM_ARN = "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream";
+
+    @ParameterizedTest
+    @MethodSource("provideInitialPositions")
+    void testStartWithoutStateDiscoversAndAssignsShards(InitialPosition initialPosition, String initialTimestamp) throws Throwable {
+        try (MockSplitEnumeratorContext<KinesisShardSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) {
+            TestKinesisStreamProxy streamProxy = getTestStreamProxy();
+            final Properties consumerConfig = new Properties();
+            consumerConfig.setProperty(SourceConfigConstants.STREAM_INITIAL_POSITION, initialPosition.name());
+            consumerConfig.setProperty(SourceConfigConstants.STREAM_INITIAL_TIMESTAMP, initialTimestamp);
+
+            // Given enumerator is initialized with no state
+            KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator(context, STREAM_ARN,
+                consumerConfig,
+                streamProxy,
+                null);
+            // When enumerator starts
+            enumerator.start();
+            // Then initial discovery scheduled, with periodic discovery after
+            assertThat(context.getOneTimeCallables())
+                .hasSize(1);
+            assertThat(context.getPeriodicCallables())
+                .hasSize(1);
+
+            // Given there is one registered reader, with 4 shards in stream
+            final int subtaskId = 1;
+            context.registerReader(TestUtil.getTestReaderInfo(subtaskId));
+            enumerator.addReader(subtaskId);
+            String[] shardIds = new String[]{generateShardId(0), generateShardId(1), generateShardId(2), generateShardId(3)};
+            streamProxy.addShards(shardIds);
+            // When first discovery runs
+            context.runNextOneTimeCallable();
+            SplitsAssignment<KinesisShardSplit> initialSplitAssignment = context.getSplitsAssignmentSequence().get(0);
+            // Then all 4 shards discovered on startup with configured INITIAL_POSITION
+            assertThat(initialSplitAssignment
+                .assignment())
+                .containsOnlyKeys(subtaskId);
+            assertThat(initialSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getShardId))
+                .containsExactly(shardIds);
+            assertThat(initialSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getStartingPosition))
+                .allSatisfy(s -> assertThat(s).isEqualTo(initialPosition.name()));
+
+            // Given no resharding occurs (list of shards remains the same)
+            // When first periodic discovery runs
+            context.runPeriodicCallable(0);
+            // Then no additional splits are assigned
+            SplitsAssignment<KinesisShardSplit> noUpdateSplitAssignment = context.getSplitsAssignmentSequence().get(1);
+            assertThat(noUpdateSplitAssignment.assignment())
+                .isEmpty();
+
+            // Given resharding occurs
+            String[] additionalShards = new String[]{generateShardId(4), generateShardId(5)};
+            streamProxy.addShards(additionalShards);
+            // When periodic discovery runs
+            context.runPeriodicCallable(0);
+            // Then only additional shards are assigned to read from TRIM_HORIZON
+            SplitsAssignment<KinesisShardSplit> afterReshardingSplitAssignment = context.getSplitsAssignmentSequence().get(2);
+            assertThat(afterReshardingSplitAssignment
+                .assignment())
+                .containsOnlyKeys(subtaskId);
+            assertThat(afterReshardingSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getShardId))
+                .containsExactly(additionalShards);
+            assertThat(afterReshardingSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getStartingPosition))
+                .allSatisfy(s -> assertThat(s).isEqualTo(InitialPosition.TRIM_HORIZON.name()));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideInitialPositions")
+    void testStartWithStateIgnoresCompletedShards(InitialPosition initialPosition, String initialTimestamp) throws Throwable {
+        try (MockSplitEnumeratorContext<KinesisShardSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) {
+            TestKinesisStreamProxy streamProxy = getTestStreamProxy();
+            final String completedShard = generateShardId(0);
+            final String lastSeenShard = generateShardId(1);
+            final Set<String> completedShardIds = ImmutableSet.of(completedShard);
+
+            KinesisStreamsSourceEnumeratorState state = new KinesisStreamsSourceEnumeratorState(
+                completedShardIds,
+                Collections.emptySet(),
+                lastSeenShard);
+
+            final Properties consumerConfig = new Properties();
+            consumerConfig.setProperty(SourceConfigConstants.STREAM_INITIAL_POSITION, initialPosition.name());
+            consumerConfig.setProperty(SourceConfigConstants.STREAM_INITIAL_TIMESTAMP, initialTimestamp);
+
+            // Given enumerator is initialised with state
+            KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator(context, STREAM_ARN,
+                consumerConfig,
+                streamProxy,
+                state);
+            // When enumerator starts
+            enumerator.start();
+            // Then no initial discovery is scheduled, but a periodic discovery is scheduled
+            assertThat(context.getOneTimeCallables())
+                .isEmpty();
+            assertThat(context.getPeriodicCallables())
+                .hasSize(1);
+
+            // Given there is one registered reader, with 4 shards in stream
+            final int subtaskId = 1;
+            context.registerReader(TestUtil.getTestReaderInfo(subtaskId));
+            enumerator.addReader(subtaskId);
+            String[] shardIds = new String[]{completedShard, lastSeenShard, generateShardId(2), generateShardId(3)};
+            streamProxy.addShards(shardIds);
+            // When first periodic discovery of shards
+            context.runPeriodicCallable(0);
+            // Then newer shards will be discovered and read from TRIM_HORIZON, independent of configured starting position
+            SplitsAssignment<KinesisShardSplit> firstUpdateSplitAssignment = context.getSplitsAssignmentSequence().get(0);
+            assertThat(firstUpdateSplitAssignment
+                .assignment())
+                .containsOnlyKeys(subtaskId);
+            assertThat(firstUpdateSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getShardId))
+                .containsExactly(generateShardId(2), generateShardId(3));
+            assertThat(firstUpdateSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getStartingPosition))
+                .allSatisfy(s -> assertThat(s).isEqualTo(InitialPosition.TRIM_HORIZON.name()));
+        }
+    }
+
+    @Test
+    void testReturnedSplitsWillBeReassigned() throws Throwable {
+        try (MockSplitEnumeratorContext<KinesisShardSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) {
+            TestKinesisStreamProxy streamProxy = getTestStreamProxy();
+            KinesisStreamsSourceEnumerator enumerator = getSimpleEnumeratorWithNoState(context, streamProxy);
+
+            // Given enumerator is initialised with one registered reader, with 4 shards in stream
+            final int subtaskId = 1;
+            context.registerReader(TestUtil.getTestReaderInfo(subtaskId));
+            enumerator.addReader(subtaskId);
+            String[] shardIds = new String[]{generateShardId(0), generateShardId(1), generateShardId(2), generateShardId(3)};
+            streamProxy.addShards(shardIds);
+
+            // When first discovery runs
+            context.runNextOneTimeCallable();
+            SplitsAssignment<KinesisShardSplit> initialSplitAssignment = context.getSplitsAssignmentSequence().get(0);
+
+            // Then all 4 shards discovered on startup with configured INITIAL_POSITION
+            assertThat(initialSplitAssignment
+                .assignment())
+                .containsOnlyKeys(subtaskId);
+            assertThat(initialSplitAssignment
+                .assignment()
+                .get(subtaskId)
+                .stream()
+                .map(KinesisShardSplit::getShardId))
+                .containsExactly(shardIds);
+
+            // Given one shard split is returned
+            KinesisShardSplit returnedSplit = initialSplitAssignment.assignment().get(subtaskId).get(0);
+            enumerator.addSplitsBack(ImmutableList.of(returnedSplit), subtaskId);
+
+            // When first periodic discovery runs
+            context.runPeriodicCallable(0);
+            // Then returned split will be assigned
+            SplitsAssignment<KinesisShardSplit> firstReturnedSplitAssignment = context.getSplitsAssignmentSequence().get(1);
+            assertThat(firstReturnedSplitAssignment.assignment())
+                .containsOnlyKeys(subtaskId);
+            assertThat(firstReturnedSplitAssignment
+                .assignment()
+                .get(subtaskId))
+                .containsExactly(returnedSplit);
+        }
+    }
+
+    @Test
+    void testAddSplitsBackWithoutSplitIsNoOp() throws Throwable {
+        try (MockSplitEnumeratorContext<KinesisShardSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) {
+            TestKinesisStreamProxy streamProxy = getTestStreamProxy();
+            KinesisStreamsSourceEnumerator enumerator = getSimpleEnumeratorWithNoState(context, streamProxy);
+            List<KinesisShardSplit> splits = ImmutableList.of(getTestSplit());
+
+            // Given enumerator has no assigned splits
+            // When we add splits back
+            // Then handled gracefully with no exception thrown
+            assertThatNoException()
+                .isThrownBy(() -> enumerator.addSplitsBack(splits, 1));
+        }
+    }
+
+    @Test
+    void testHandleSplitRequestIsNoOp() throws Throwable {
+        try (MockSplitEnumeratorContext<KinesisShardSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) {
+            TestKinesisStreamProxy streamProxy = getTestStreamProxy();
+            KinesisStreamsSourceEnumerator enumerator = getSimpleEnumeratorWithNoState(context, streamProxy);
+
+            // Given enumerator has no assigned splits
+            // When we add splits back
+            // Then handled gracefully with no exception thrown
+            assertThatNoException()
+                .isThrownBy(() -> enumerator.handleSplitRequest( 1, "some-hostname"));
+        }
+    }
+
+

Review Comment:
   nit: unnecessary new line



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {
+    public enum InitialPosition {
+        LATEST,
+        TRIM_HORIZON,
+        AT_TIMESTAMP
+    }
+
+    /** The record publisher type represents the record-consume style. */
+    public enum RecordPublisherType {
+
+        /** Consume the Kinesis records using AWS SDK v2 with the enhanced fan-out consumer. */
+        EFO,
+        /** Consume the Kinesis records using AWS SDK v1 with the get-records method. */
+        POLLING
+    }
+
+    /** The EFO registration type represents how we are going to de-/register efo consumer. */
+    public enum EFORegistrationType {
+
+        /**
+         * Delay the registration of efo consumer for taskmanager to execute. De-register the efo
+         * consumer for taskmanager to execute when task is shut down.
+         */
+        LAZY,
+        /**
+         * Register the efo consumer eagerly for jobmanager to execute. De-register the efo consumer
+         * the same way as lazy does.
+         */
+        EAGER,
+        /** Do not register efo consumer programmatically. Do not de-register either. */
+        NONE
+    }
+
+    /** The RecordPublisher type (EFO|POLLING). */
+    public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
+
+    public static final String DEFAULT_RECORD_PUBLISHER_TYPE =
+            RecordPublisherType.POLLING.toString();
+
+    /** Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). */
+    public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
+
+    public static final String DEFAULT_EFO_REGISTRATION_TYPE = EFORegistrationType.EAGER.toString();
+
+    /** The name of the EFO consumer to register with KDS. */
+    public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
+
+    /** The prefix of consumer ARN for a given stream. */
+    public static final String EFO_CONSUMER_ARN_PREFIX = "flink.stream.efo.consumerarn";
+
+    /** The initial position to start reading Kinesis streams from. */
+    public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
+
+    public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
+
+    /**
+     * The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for
+     * STREAM_INITIAL_POSITION).
+     */
+    public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
+
+    /**
+     * The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP
+     * is set for STREAM_INITIAL_POSITION).
+     */
+    public static final String STREAM_TIMESTAMP_DATE_FORMAT =
+            "flink.stream.initpos.timestamp.format";
+
+    public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
+            "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
+    /** The maximum number of describeStream attempts if we get a recoverable exception. */
+    public static final String STREAM_DESCRIBE_RETRIES = "flink.stream.describe.maxretries";
+
+    public static final int DEFAULT_STREAM_DESCRIBE_RETRIES = 50;
+
+    /** The base backoff time between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
+
+    public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 2000L;
+
+    /** The maximum backoff time between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
+
+    public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each describeStream attempt. */
+    public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.describe.backoff.expconst";
+
+    public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of listShards attempts if we get a recoverable exception. */
+    public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";
+
+    public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
+    /** The base backoff time between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
+
+    public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
+
+    /** The maximum backoff time between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_MAX = "flink.list.shards.backoff.max";
+
+    public static final long DEFAULT_LIST_SHARDS_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each listShards attempt. */
+    public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.list.shards.backoff.expconst";
+
+    public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of describeStreamConsumer attempts if we get a recoverable exception. */
+    public static final String DESCRIBE_STREAM_CONSUMER_RETRIES =
+            "flink.stream.describestreamconsumer.maxretries";
+
+    public static final int DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES = 50;
+
+    /** The base backoff time between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE =
+            "flink.stream.describestreamconsumer.backoff.base";
+
+    public static final long DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE = 2000L;
+
+    /** The maximum backoff time between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX =
+            "flink.stream.describestreamconsumer.backoff.max";
+
+    public static final long DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX = 5000L;
+
+    /** The power constant for exponential backoff between each describeStreamConsumer attempt. */
+    public static final String DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.describestreamconsumer.backoff.expconst";
+
+    public static final double DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /** The maximum number of registerStreamConsumer attempts if we get a recoverable exception. */
+    public static final String REGISTER_STREAM_CONSUMER_RETRIES =
+            "flink.stream.registerstreamconsumer.maxretries";
+
+    public static final int DEFAULT_REGISTER_STREAM_CONSUMER_RETRIES = 10;
+
+    /**
+     * The maximum time in seconds to wait for a stream consumer to become active before giving up.
+     */
+    public static final String REGISTER_STREAM_CONSUMER_TIMEOUT_SECONDS =
+            "flink.stream.registerstreamconsumer.timeout";
+
+    public static final Duration DEFAULT_REGISTER_STREAM_CONSUMER_TIMEOUT = Duration.ofSeconds(60);
+
+    /** The base backoff time between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_CONSUMER_BACKOFF_BASE =
+            "flink.stream.registerstreamconsumer.backoff.base";
+
+    public static final long DEFAULT_REGISTER_STREAM_CONSUMER_BACKOFF_BASE = 500L;
+
+    /** The maximum backoff time between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_CONSUMER_BACKOFF_MAX =
+            "flink.stream.registerstreamconsumer.backoff.max";
+
+    public static final long DEFAULT_REGISTER_STREAM_CONSUMER_BACKOFF_MAX = 2000L;
+
+    /** The power constant for exponential backoff between each registerStreamConsumer attempt. */
+    public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
+            "flink.stream.registerstreamconsumer.backoff.expconst";
+
+    public static final double DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+    /**
+     * The maximum number of deregisterStreamConsumer attempts if we get a recoverable exception.
+     */
+    public static final String DEREGISTER_STREAM_CONSUMER_RETRIES =
+            "flink.stream.deregisterstreamconsumer.maxretries";
+
+    public static final int DEFAULT_DEREGISTER_STREAM_CONSUMER_RETRIES = 10;
+
+    /** The maximum time in seconds to wait for a stream consumer to deregister before giving up. */
+    public static final String DEREGISTER_STREAM_CONSUMER_TIMEOUT_SECONDS =
+            "flink.stream.deregisterstreamconsumer.timeout";
+
+    public static final Duration DEFAULT_DEREGISTER_STREAM_CONSUMER_TIMEOUT =
+            Duration.ofSeconds(60);
+
+    /** The base backoff time between each deregisterStreamConsumer attempt. */
+    public static final String DEREGISTER_STREAM_BACKOFF_BASE =
+            "flink.stream.deregisterstreamconsumer.backoff.base";
+
+    public static final long DEFAULT_DEREGISTER_STREAM_CONSUMER_BACKOFF_BASE = 500L;
+
+    /** The maximum backoff time between each deregisterStreamConsumer attempt. */
+    public static final String DEREGISTER_STREAM_CONSUMER_BACKOFF_MAX =
+            "flink.stream.deregisterstreamconsumer.backoff.max";
+
+    public static final long DEFAULT_DEREGISTER_STREAM_CONSUMER_BACKOFF_MAX = 2000L;
+
+    /** The power constant for exponential backoff between each deregisterStreamConsumer attempt. */
+    public static final String DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =

Review Comment:
   same



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##########


Review Comment:
   +1



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+@PublicEvolving
+public class KinesisStreamsSourceEnumerator
+    implements SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new HashMap<>();
+    private final Set<KinesisShardSplit> assignedSplits = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedShards;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+        SplitEnumeratorContext<KinesisShardSplit> context,
+        String streamArn,
+        Properties consumerConfig,
+        StreamProxy streamProxy,
+        KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = new HashShardAssigner();
+        this.shardAssignerContext =
+            new ShardAssignerContext(splitAssignment, context.registeredReaders());
+        if (state == null) {
+            this.completedShards = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedShards = state.getCompletedShardIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        context.callAsync(this::periodicallyDiscoverSplits, this::assignSplits, 10_000L, 10_000L);

Review Comment:
   Should this be configured?
   https://github.com/apache/flink-connector-aws/blob/17820d062007a4fcdf24c3b974c636f32705cd27/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L677



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org