You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tweise <gi...@git.apache.org> on 2018/07/11 21:34:54 UTC
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user tweise commented on a diff in the pull request:
https://github.com/apache/flink/pull/6300#discussion_r201793459
--- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+ /**
+ * Adapts the maxNumberOfRecordsPerFetch based on the current average record size
+ * to optimize 2 Mb / sec read limits.
+ *
+ * @param averageRecordSizeBytes
+ * @return adaptedMaxRecordsPerFetch
+ */
+
+ private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --
Make this protected to allow for override? (Currently the shard consumer as a whole cannot be customized, but I think it should.)
---