You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/11 20:52:15 UTC

[GitHub] [flink] dannycranmer commented on a change in pull request #12881: [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher

dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r468857567



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.util.function.Consumer;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
+	// AWS Kinesis has a read limit of 2 Mb/sec
+	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+	private int lastRecordBatchSize = 0;
+
+	private long lastRecordBatchSizeInBytes = 0;
+
+	private long processingStartTimeNanos = System.nanoTime();
+
+	private int maxNumberOfRecordsPerFetch;
+
+	private final long fetchIntervalMillis;
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	public AdaptivePollingRecordPublisher(

Review comment:
       Done

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	private final KinesisProxyInterface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private String nextShardItr;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long expiredIteratorBackoffMillis;
+
+	/**
+	 * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+	 * The following KDS services are used: GetRecords and GetShardIterator.
+	 *
+	 * @param subscribedShard the shard in which to consume from
+	 * @param metricsReporter a metric reporter used to output metrics
+	 * @param kinesisProxy the proxy used to communicate with kinesis
+	 * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+	 * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+	 */
+	public PollingRecordPublisher(
+			final StreamShardHandle subscribedShard,
+			final PollingRecordPublisherMetricsReporter metricsReporter,
+			final KinesisProxyInterface kinesisProxy,
+			final int maxNumberOfRecordsPerFetch,
+			final long expiredIteratorBackoffMillis) {
+		this.subscribedShard = subscribedShard;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org