You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2016/08/29 10:04:50 UTC

[GitHub] flink pull request #2432: [FLINK-4514][kinesis-connector] Handle unexpected ...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2432

    [FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions

    Handle any unexpected {{ExpiredIteratorException}}s on {{getRecords()}} calls be refreshing the failing shard iterator with a new one.
    
    A user reported this issue when replaying Kinesis data over a wide time span, but then the consumer was back to normal after the consumer caught up with the latest data. I tried to reproduce the exception, but have come short to be able to reproduce. The behaviour seems to be inconsistent.
    
    Therefore, this change treats the exception as "unexpected" by simply catching the exception and refreshing the iterator. There's actually no guarantee of how much time had passed between each getRecords() request anyways, so this is a simple way to handle this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4514

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2432
    
----
commit df833ddbca9971b5f03417efb65527408a8ad9c4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2016-08-29T09:30:39Z

    [FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    Max seems to have just hotfixed the failing flink-mesos tests.
    Rebasing this PR on latest master. Merging this once Travis turns green.
    I'll open a separate JIRA to improve the fetch interval implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    I think a check for the interval to be lower than 5 minutes is sufficient. Setting the limit to 4.5 min seems to be a bit too strict. You never know if some advanced users want to cover a very specific use case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    Thanks for the confirmation.
    The build will probably fail again due to an unused import: https://travis-ci.org/tzulitai/flink/jobs/156191304, which was just hotfixed, so we need to rebase again. But I think it's ok to merge this now, because the tests for all the connectors had passed the last run before it was rebased on the bucketed rolling sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    @rmetzger I know it might be a bit of a rush, but could you have a quick look at this too?
    It's not a critical blocker, but might as well would be good to make it into the 1.1.2 patch freeze.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2432: [FLINK-4514][kinesis-connector] Handle unexpected ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2432


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2432: [FLINK-4514][kinesis-connector] Handle unexpected ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2432#discussion_r76620194
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
     			subscribedShard.getStreamName(),
     			subscribedShard.getShard().getShardId());
     
    -		if (record.isAggregated()) {
    -			fetcherRef.emitRecordAndUpdateState(
    -				value,
    -				approxArrivalTimestamp,
    -				subscribedShardStateIndex,
    -				new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
    -		} else {
    -			fetcherRef.emitRecordAndUpdateState(
    -				value,
    -				approxArrivalTimestamp,
    -				subscribedShardStateIndex,
    -				new SequenceNumber(record.getSequenceNumber()));
    +		SequenceNumber collectedSequenceNumber = (record.isAggregated())
    +			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
    +			: new SequenceNumber(record.getSequenceNumber());
    +
    +		fetcherRef.emitRecordAndUpdateState(
    +			value,
    +			approxArrivalTimestamp,
    +			subscribedShardStateIndex,
    +			collectedSequenceNumber);
    +
    +		lastSequenceNum = collectedSequenceNumber;
    +	}
    +
    +	/**
    +	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
    +	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
    +	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
    +	 * be used for the next call to this method.
    +	 *
    +	 * Note: it is important that this method is not called again before all the records from the last result have been
    +	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
    +	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
    +	 * incorrect shard iteration if the iterator had to be refreshed.
    +	 *
    +	 * @param shardItr shard iterator to use
    +	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
    +	 * @return get records result
    +	 * @throws InterruptedException
    +	 */
    +	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    +		GetRecordsResult getRecordsResult = null;
    +		while (getRecordsResult == null) {
    +			try {
    +				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
    +			} catch (ExpiredIteratorException eiEx) {
    +				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
    +					" refreshing the iterator ...", shardItr, subscribedShard);
    +				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
    +
    +				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
    +				if (fetchIntervalMillis != 0) {
    +					Thread.sleep(fetchIntervalMillis);
    +				}
    --- End diff --
    
    Sorry for the race commit, didn't realize you was still reviewing.
    
    I agree. So, if we're to limit the fetch interval configuration to 5 minutes, then we'll likely infinitely get stuck in this loop, right? I think that was what I had in mind for a more strict 4.5 min, to assure this doesn't happen :P But still, logically, we never know what the `n` will be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    The latest commit sets the check to be less than 5 minutes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2432: [FLINK-4514][kinesis-connector] Handle unexpected Expired...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2432
  
    I think it'll also make sense to limit the config setting `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` to be lower than the shard iterator expire time, otherwise the shard iterator will definitely timeout on the next `getRecords()`.
    
    AWS documentation says the expire is 5 minutes (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html), I propose to set the limit to be 4.5 min, although I don't expect any user would actually set such a high value.
    
    Adding this now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2432: [FLINK-4514][kinesis-connector] Handle unexpected ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2432#discussion_r76617364
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
     			subscribedShard.getStreamName(),
     			subscribedShard.getShard().getShardId());
     
    -		if (record.isAggregated()) {
    -			fetcherRef.emitRecordAndUpdateState(
    -				value,
    -				approxArrivalTimestamp,
    -				subscribedShardStateIndex,
    -				new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
    -		} else {
    -			fetcherRef.emitRecordAndUpdateState(
    -				value,
    -				approxArrivalTimestamp,
    -				subscribedShardStateIndex,
    -				new SequenceNumber(record.getSequenceNumber()));
    +		SequenceNumber collectedSequenceNumber = (record.isAggregated())
    +			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
    +			: new SequenceNumber(record.getSequenceNumber());
    +
    +		fetcherRef.emitRecordAndUpdateState(
    +			value,
    +			approxArrivalTimestamp,
    +			subscribedShardStateIndex,
    +			collectedSequenceNumber);
    +
    +		lastSequenceNum = collectedSequenceNumber;
    +	}
    +
    +	/**
    +	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
    +	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
    +	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
    +	 * be used for the next call to this method.
    +	 *
    +	 * Note: it is important that this method is not called again before all the records from the last result have been
    +	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
    +	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
    +	 * incorrect shard iteration if the iterator had to be refreshed.
    +	 *
    +	 * @param shardItr shard iterator to use
    +	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
    +	 * @return get records result
    +	 * @throws InterruptedException
    +	 */
    +	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    +		GetRecordsResult getRecordsResult = null;
    +		while (getRecordsResult == null) {
    +			try {
    +				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
    +			} catch (ExpiredIteratorException eiEx) {
    +				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
    +					" refreshing the iterator ...", shardItr, subscribedShard);
    +				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
    +
    +				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
    +				if (fetchIntervalMillis != 0) {
    +					Thread.sleep(fetchIntervalMillis);
    +				}
    --- End diff --
    
    This fetchInterval implementation can lead to much larger fetch intervals.
    If the getRecords call needs `n` milliseconds, the waiting time between each `getRecords` call is is `n + fetchInterval`.
    We don't need to fix this in this PR, but I think in general, we should fix it (if you agree). Also, we need to see how we make this efficient (System.currentTimeMilis() is a somewhat expensive call).
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---