You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "z3d1k (via GitHub)" <gi...@apache.org> on 2023/04/26 12:55:12 UTC

[GitHub] [flink-connector-aws] z3d1k commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

z3d1k commented on code in PR #49:
URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1177831924


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/HashShardAssigner.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+@PublicEvolving
+public class HashShardAssigner implements KinesisShardAssigner {
+    @Override
+    public int assign(KinesisShardSplit split, Context context) {
+        Integer[] availableSubtasks = new Integer[] {};
+        availableSubtasks = context.getRegisteredReaders().keySet().toArray(availableSubtasks);
+        if (availableSubtasks.length < 1) {
+            throw new IllegalArgumentException(
+                    "Expected at least one registered reader. Unable to assign split.");
+        }

Review Comment:
   nit: using `Preconditions.checkArgument` might be cleaner.



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/** TODO: Add JAvadoc. */
+@Internal
+public class KinesisStreamsSourceEnumeratorStateSerializer
+        implements SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    private final KinesisShardSplitSerializer splitSerializer;
+
+    public KinesisStreamsSourceEnumeratorStateSerializer(KinesisShardSplitSerializer splitSerializer) {
+        this.splitSerializer = splitSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState)
+            throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+
+            out.writeInt(getVersion());
+
+            boolean hasLastSeenShardId =
+                    kinesisStreamsSourceEnumeratorState.getLastSeenShardId() != null;
+            out.writeBoolean(hasLastSeenShardId);
+            if (hasLastSeenShardId) {
+                out.writeUTF(kinesisStreamsSourceEnumeratorState.getLastSeenShardId());
+            }
+
+            out.writeInt(kinesisStreamsSourceEnumeratorState.getCompletedShardIds().size());
+            for (String shardId : kinesisStreamsSourceEnumeratorState.getCompletedShardIds()) {
+                out.writeUTF(shardId);
+            }
+
+            out.writeInt(kinesisStreamsSourceEnumeratorState.getUnassignedSplits().size());
+            out.writeInt(splitSerializer.getVersion());
+            for (KinesisShardSplit split : kinesisStreamsSourceEnumeratorState.getUnassignedSplits()) {
+                byte[] serializedSplit = splitSerializer.serialize(split);
+                out.writeInt(serializedSplit.length);
+                out.write(serializedSplit);
+            }
+
+            out.flush();
+
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState deserialize(
+            int version, byte[] serializedEnumeratorState) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
+                DataInputStream in = new DataInputStream(bais)) {
+
+            final int serializerVersion = in.readInt();
+
+            if (serializerVersion != getVersion()) {
+                throw new IOException("Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version of " + getVersion());

Review Comment:
   Can more specific exception be used here? Same for other exceptions here.



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+@PublicEvolving
+public class KinesisStreamsSourceEnumerator
+    implements SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new HashMap<>();
+    private final Set<KinesisShardSplit> assignedSplits = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedShards;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+        SplitEnumeratorContext<KinesisShardSplit> context,
+        String streamArn,
+        Properties consumerConfig,
+        StreamProxy streamProxy,
+        KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = new HashShardAssigner();
+        this.shardAssignerContext =
+            new ShardAssignerContext(splitAssignment, context.registeredReaders());
+        if (state == null) {
+            this.completedShards = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedShards = state.getCompletedShardIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        context.callAsync(this::periodicallyDiscoverSplits, this::assignSplits, 10_000L, 10_000L);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+        // Do nothing, since we assign splits eagerly
+    }
+
+    @Override
+    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
+        if (!splitAssignment.containsKey(subtaskId)) {
+            LOG.warn(
+                "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}",
+                subtaskId,
+                splits);
+            return;
+        }
+
+        for (KinesisShardSplit split : splits) {
+            splitAssignment.get(subtaskId).remove(split);
+            assignedSplits.remove(split);
+            unassignedSplits.add(split);
+        }
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
+        return new KinesisStreamsSourceEnumeratorState(completedShards, unassignedSplits, lastSeenShardId);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    private List<KinesisShardSplit> initialDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId);
+        return mapToSplits(shards, false);
+    }
+
+    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId);
+        // Any shard discovered after the initial startup should be read from the start, since they
+        // come from resharding
+        return mapToSplits(shards, true);
+    }
+
+    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, boolean shouldReadFromStart) {
+        InitialPosition startingPosition =
+            shouldReadFromStart
+                ? InitialPosition.TRIM_HORIZON
+                : InitialPosition.valueOf(
+                consumerConfig
+                    .getOrDefault(
+                        STREAM_INITIAL_POSITION,
+                        DEFAULT_STREAM_INITIAL_POSITION)
+                    .toString());
+        long startingTimestamp = 0;
+        switch (startingPosition) {
+            case LATEST:
+                startingTimestamp = Instant.now().toEpochMilli();
+                break;
+            case AT_TIMESTAMP:
+                startingTimestamp = parseStreamTimestampStartingPosition(consumerConfig).getTime();
+                break;
+            case TRIM_HORIZON:
+            default:
+                // Since we are reading from TRIM_HORIZON, starting time epoch millis can be 0
+        }
+
+        List<KinesisShardSplit> splits = new ArrayList<>();
+        for (Shard shard : shards) {
+            splits.add(
+                new KinesisShardSplit(
+                    streamArn,
+                    shard.shardId(),
+                    startingPosition.toString(),
+                    startingTimestamp));
+        }
+
+        return splits;
+    }
+
+    private void assignSplits(List<KinesisShardSplit> discoveredSplits, Throwable t) {
+        if (t != null) {
+            throw new FlinkRuntimeException("Failed to list shards.", t);

Review Comment:
   Can more specific exception be used here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org