You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/16 02:19:00 UTC

[jira] [Commented] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer

    [ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651050#comment-16651050 ] 

ASF GitHub Bot commented on FLINK-4574:
---------------------------------------

tony810430 closed pull request #2925: [FLINK-4574] [kinesis] Strengthen fetch interval implementation in Kinesis consumer
URL: https://github.com/apache/flink/pull/2925
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 612a4a7b273..2da0c912771 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -38,6 +38,10 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,6 +68,9 @@
 
 	private SequenceNumber lastSequenceNum;
 
+	/** Reference to the first error thrown by the {@link ShardConsumerFetcher} threads */
+	private final AtomicReference<Throwable> error;
+
 	/**
 	 * Creates a shard consumer.
 	 *
@@ -81,7 +88,7 @@ public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 			subscribedShard,
 			lastSequenceNum,
 			KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
-	}
+		}
 
 	/** This constructor is exposed for testing purposes */
 	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
@@ -107,27 +114,30 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
 			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+		this.error = new AtomicReference<Throwable>();
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public void run() {
-		String nextShardItr;
+		String startShardItr;
+		Timer shardConsumerFetcherScheduler = new Timer();
 
 		try {
-			// before infinitely looping, we set the initial nextShardItr appropriately
+			// before infinitely looping, we set the initial startShardItr appropriately
 
 			if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
 				// if the shard is already closed, there will be no latest next record to get for this shard
 				if (subscribedShard.isClosed()) {
-					nextShardItr = null;
+					startShardItr = null;
 				} else {
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
+					startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
 				}
 			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
+				startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
 			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-				nextShardItr = null;
+				startShardItr = null;
 			} else {
 				// we will be starting from an actual sequence number (due to restore from failure).
 				// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
@@ -154,42 +164,115 @@ public void run() {
 						}
 					}
 
-					// set the nextShardItr so we can continue iterating in the next while loop
-					nextShardItr = getRecordsResult.getNextShardIterator();
+					// set the startShardItr so we can continue iterating in the next while loop
+					startShardItr = getRecordsResult.getNextShardIterator();
 				} else {
 					// the last record was non-aggregated, so we can simply start from the next record
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+					startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
 				}
 			}
 
-			while(isRunning()) {
-				if (nextShardItr == null) {
-					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
-					// we can close this consumer thread once we've reached the end of the subscribed shard
-					break;
-				} else {
-					if (fetchIntervalMillis != 0) {
-						Thread.sleep(fetchIntervalMillis);
-					}
+			ArrayBlockingQueue<UserRecord> queue = new ArrayBlockingQueue<>(maxNumberOfRecordsPerFetch);
+			ShardConsumerFetcher shardConsumerFetcher;
 
-					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+			if (fetchIntervalMillis > 0L) {
+				shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, false);
+				shardConsumerFetcherScheduler.scheduleAtFixedRate(shardConsumerFetcher, 0L, fetchIntervalMillis);
+			} else {
+				// if fetchIntervalMillis is 0, make the task run forever and schedule it once only.
+				shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, true);
+				shardConsumerFetcherScheduler.schedule(shardConsumerFetcher, 0L);
+			}
 
-					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+			while(isRunning()) {
+				UserRecord record = queue.poll();
+				if (record != null) {
+					deserializeRecordForCollectionAndUpdateState(record);
+				} else {
+					if (shardConsumerFetcher.nextShardItr == null) {
+						fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
-					for (UserRecord record : fetchedRecords) {
-						deserializeRecordForCollectionAndUpdateState(record);
+						// we can close this consumer thread once we've reached the end of the subscribed shard
+						break;
 					}
+				}
 
-					nextShardItr = getRecordsResult.getNextShardIterator();
+				Throwable throwable = this.error.get();
+				if (throwable != null) {
+					throw throwable;
 				}
 			}
 		} catch (Throwable t) {
 			fetcherRef.stopWithError(t);
+		} finally {
+			shardConsumerFetcherScheduler.cancel();
+		}
+	}
+
+	private class ShardConsumerFetcher extends TimerTask {
+		private String nextShardItr;
+
+		private final ShardConsumer<T> shardConsumerRef;
+
+		private final ArrayBlockingQueue<UserRecord> userRecordQueue;
+
+		/** The latest finish time for fetching data from Kinesis used to recognize if the following task has been delayed. */
+		private Long lastFinishTime = -1L;
+
+		private boolean runForever;
+
+		ShardConsumerFetcher(ShardConsumer<T> shardConsumerRef,
+							String nextShardItr,
+							ArrayBlockingQueue<UserRecord> userRecordQueue,
+							boolean runForever) {
+			this.shardConsumerRef = shardConsumerRef;
+			this.nextShardItr = nextShardItr;
+			this.userRecordQueue = userRecordQueue;
+			this.runForever = runForever;
+		}
+
+		@Override
+		public void run() {
+			GetRecordsResult getRecordsResult;
+			List<UserRecord> fetchedRecords;
+
+			try {
+				do {
+					if (nextShardItr != null) {
+						// ignore to log this warning if runForever is true, since fetchIntervalMillis is 0
+						if (!runForever && this.scheduledExecutionTime() < lastFinishTime) {
+							// If expected scheduled execution time is earlier than lastFinishTime,
+							// it seems that the fetchIntervalMillis might be short to finish the previous task.
+							LOG.warn("The value given for ShardConsumer is too short to finish getRecords on time.");
+						} else {
+							getRecordsResult = shardConsumerRef.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+
+							if (getRecordsResult != null) {
+								// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
+								fetchedRecords = deaggregateRecords(
+									getRecordsResult.getRecords(),
+									subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+									subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+								for (UserRecord record : fetchedRecords) {
+									userRecordQueue.put(record);
+								}
+
+								nextShardItr = getRecordsResult.getNextShardIterator();
+							} else {
+								// getRecordsResult got null due to iterator expired.
+								// Give up this task and get a new shard iterator for the next task.
+								nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+							}
+							lastFinishTime = System.currentTimeMillis();
+						}
+					} else {
+						break;
+					}
+				} while (runForever);
+			} catch (Throwable t) {
+				shardConsumerRef.stopWithError(t);
+			}
 		}
 	}
 
@@ -203,6 +286,12 @@ private boolean isRunning() {
 		return !Thread.interrupted();
 	}
 
+	/** Called by created TimerTask: {@link ShardConsumerFetcher} to pass on errors. Only the first thrown error is set.
+	 * Once set, It will cause run() to throw the error and call stopWithError() in {@link KinesisDataFetcher}*/
+	private void stopWithError(Throwable throwable) {
+		this.error.compareAndSet(null, throwable);
+	}
+
 	/**
 	 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
 	 * successfully collected sequence number in this shard consumer is also updated so that
@@ -263,20 +352,13 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
 	 */
 	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
 		GetRecordsResult getRecordsResult = null;
-		while (getRecordsResult == null) {
-			try {
-				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
-			} catch (ExpiredIteratorException eiEx) {
-				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
-					" refreshing the iterator ...", shardItr, subscribedShard);
-				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
-				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
-				if (fetchIntervalMillis != 0) {
-					Thread.sleep(fetchIntervalMillis);
-				}
-			}
+		try {
+			getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
+		} catch (ExpiredIteratorException eiEx) {
+			LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+				" refreshing the iterator ...", shardItr, subscribedShard);
 		}
+
 		return getRecordsResult;
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Strengthen fetch interval implementation in Kinesis consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-4574
>                 URL: https://issues.apache.org/jira/browse/FLINK-4574
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>            Priority: Major
>              Labels: pull-request-available
>
> As pointed out by [~rmetzger], right now the fetch interval implementation in the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer interval times than specified by the user, ex. say the specified fetch interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and {{y}} to complete processing the fetched records for emitting, than the actual interval between each fetch is actually {{f+x+y}}.
> The main problem with this is that we can never guarantee how much time has past since the last {{getRecords}} call, thus can not guarantee that returned shard iterators will not have expired the next time we use them, even if we limit the user-given value for {{f}} to not be longer than the iterator expire time.
> I propose to improve this by, per {{ShardConsumer}}, use a {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, and a separate blocking queue that collects the fetched records for emitting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)