You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/01 13:20:00 UTC

[jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

    [ https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565297#comment-16565297 ] 

ASF GitHub Bot commented on FLINK-9897:
---------------------------------------

asfgit closed pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive reads depend on run loop time instead of fetchintervalmillis
URL: https://github.com/apache/flink/pull/6408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 77d180cc395..b14c6a434d8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -207,7 +207,7 @@ public void run() {
 				}
 			}
 
-			long lastTimeNanos = 0;
+			long processingStartTimeNanos = System.nanoTime();
 			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -215,14 +215,6 @@ public void run() {
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
-					if (fetchIntervalMillis != 0) {
-						long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
-						long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
-						if (sleepTimeMillis > 0) {
-							Thread.sleep(sleepTimeMillis);
-						}
-						lastTimeNanos = System.nanoTime();
-					}
 
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
@@ -233,19 +225,17 @@ public void run() {
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
 					long recordBatchSizeBytes = 0L;
-					long averageRecordSizeBytes = 0L;
-
 					for (UserRecord record : fetchedRecords) {
 						recordBatchSizeBytes += record.getData().remaining();
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
-					if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
-						averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
-						maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-					}
-
 					nextShardItr = getRecordsResult.getNextShardIterator();
+
+					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
+					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
+					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
+					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
 		} catch (Throwable t) {
@@ -253,6 +243,50 @@ public void run() {
 		}
 	}
 
+	/**
+	 * Adjusts loop timing to match target frequency if specified.
+	 * @param processingStartTimeNanos The start time of the run loop "work"
+	 * @param processingEndTimeNanos The end time of the run loop "work"
+	 * @return The System.nanoTime() after the sleep (if any)
+	 * @throws InterruptedException
+	 */
+	protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
+		throws InterruptedException {
+		long endTimeNanos = processingEndTimeNanos;
+		if (fetchIntervalMillis != 0) {
+			long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
+			long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
+			if (sleepTimeMillis > 0) {
+				Thread.sleep(sleepTimeMillis);
+				endTimeNanos = System.nanoTime();
+			}
+		}
+		return endTimeNanos;
+	}
+
+	/**
+	 * Calculates how many records to read each time through the loop based on a target throughput
+	 * and the measured frequenecy of the loop.
+	 * @param runLoopTimeNanos The total time of one pass through the loop
+	 * @param numRecords The number of records of the last read operation
+	 * @param recordBatchSizeBytes The total batch size of the last read operation
+	 * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
+	 */
+	private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
+			int maxNumberOfRecordsPerFetch) {
+		if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) {
+			long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
+			// Adjust number of records to fetch from the shard depending on current average record size
+			// to optimize 2 Mb / sec read limits
+			double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
+			double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
+			// Ensure the value is not more than 10000L
+			maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+		}
+		return maxNumberOfRecordsPerFetch;
+	}
+
 	/**
 	 * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
 	 * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
@@ -347,23 +381,4 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr
 	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
 		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
 	}
-
-	/**
-	 * Adapts the maxNumberOfRecordsPerFetch based on the current average record size
-	 * to optimize 2 Mb / sec read limits.
-	 *
-	 * @param averageRecordSizeBytes
-	 * @return adaptedMaxRecordsPerFetch
-	 */
-
-	protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
-		int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
-		if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
-				adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis));
-
-				// Ensure the value is not more than 10000L
-				adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
-			}
-		return adaptedMaxRecordsPerFetch;
-	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-9897
>                 URL: https://issues.apache.org/jira/browse/FLINK-9897
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.4.2, 1.5.1
>            Reporter: Lakshmi Rao
>            Assignee: Lakshmi Rao
>            Priority: Major
>              Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively read more records based on the current average record size to optimize the 2 Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 reads/sec (as prescribed by Kinesis limits). In the case where applications take more time to process records in the run loop, they are no longer able to read at a frequency of 5 reads/sec (even though their fetchIntervalMillis maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch should be calculated based on the time that the run loop actually takes as opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)