You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steffen Hausmann <st...@hausmann-family.de> on 2016/10/04 18:55:41 UTC

ExpiredIteratorException when reading from a Kinesis stream

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events 
from a Kinesis stream. However, after a while (the exact duration varies 
and is in the order of minutes) the Kinesis source doesn't emit any 
further events and hence Flink doesn't produce any further output. 
Eventually, an ExpiredIteratorException occurs in one of the task, 
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for 
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm 
running isn't suspended but hangs just a few minutes after the job has 
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Josh <jo...@gmail.com>.
I've reset the state and the job appears to be running smoothly again now.
My guess is that this problem was somehow related to my state becoming too
large (it got to around 20GB before the problem began). I would still like
to get to the bottom of what caused this as resetting the job's state is
not a long term solution, so if anyone has any ideas I can restore the
large state and investigate further.

Btw sorry for going a bit off topic on this thread!

On Fri, Nov 4, 2016 at 11:19 AM, Josh <jo...@gmail.com> wrote:

> Hi Scott & Stephan,
>
> The problem has happened a couple more times since yesterday, it's very
> strange as my job was running fine for over a week before this started
> happening. I find that if I restart the job (and restore from the last
> checkpoint) it runs fine for a while (couple of hours) before breaking
> again.
>
> @Scott thanks, I'll try testing with the upgraded versions, though since
> my job was running fine for over a week it feels like there might be
> something else going on here.
>
> @Stephan I see, my sink is a Kafka topic. I only have two nodes in my
> Kafka cluster and CPU and memory usage seems normal on both nodes. I can't
> see anything bad in the task manager logs relating to the Kafka producer
> either. I do have a fairly large state (20GB) but I'm using the latest
> RocksDB state backend (with asynchronous checkpointing). I'm not sure what
> else I can do to investigate this, but let me know if you have any more
> ideas!
>
> Thanks,
> Josh
>
>
> On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Is it possible that you have stalls in your topology?
>>
>> Reasons could be:
>>
>>   - The data sink blocks or becomes slow for some periods (where are you
>> sending the data to?)
>>
>>   - If you are using large state and a state backend that only supports
>> synchronous checkpointing, there may be a delay introduced by the checkpoint
>>
>>
>> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <ki...@gmail.com>
>> wrote:
>>
>>> Hi Steffan & Josh,
>>>
>>> For what it's worth, I've been using the Kinesis connector with very
>>> good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis
>>> connector KCL and AWS SDK dependencies to the following versions:
>>>
>>> aws.sdk.version: 1.11.34
>>> aws.kinesis-kcl.version: 1.7.0
>>>
>>> My customizations are visible in this commit on my fork:
>>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>>> cb4d37518859e159b32
>>>
>>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>>> the problem persists.
>>>
>>> Best,
>>>
>>> --Scott Kidder
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>> Thanks for the fast reply!
>>>> You're right about the expired iterator exception occurring just before
>>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>>> has been <15% the whole time when the spikes were taking place and I can't
>>>> see anything unusual in the task manager logs.
>>>>
>>>> But actually I just noticed that the Flink UI showed no successful
>>>> checkpoints during the time of the problem even though my checkpoint
>>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>>> can't find anything useful in the logs so not sure what happened!
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> That warning message was added as part of FLINK-4514. It pops out
>>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>>> Kinesis.
>>>>> The only time spent between after a shard iterator was returned and
>>>>> before it was used to fetch the next batch of records, is on deserializing
>>>>> and emitting of the records of the last fetched batch.
>>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>>> this normally shouldn’t happen.
>>>>>
>>>>> Have you noticed any sign of long, constant full GC for your Flink
>>>>> task managers? From your description and check in code, the only possible
>>>>> guess I can come up with now is that
>>>>> the source tasks completely seized to be running for a period of time,
>>>>> and when it came back, the shard iterator was unexpectedly found to be
>>>>> expired. According to the graph you attached,
>>>>> when the iterator was refreshed and tasks successfully fetched a few
>>>>> more batches, the source tasks again halted, and so on.
>>>>> So you should see that same warning message right before every small
>>>>> peak within the graph.
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:
>>>>>
>>>>> Hey Gordon,
>>>>>
>>>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>>>> with no problems, but yesterday the Kinesis consumer started behaving
>>>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>>>> time (see the spikes in graph attached which shows data consumed by the
>>>>> Flink Kinesis consumer)
>>>>>
>>>>> Looking in the task manager logs, there are no exceptions however
>>>>> there is this log message which I believe is related to the problem:
>>>>>
>>>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>>>> {StartingSequenceNumber: 495665429169236488921642479266
>>>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>>>
>>>>> Having restarted the job from my last savepoint, it's consuming the
>>>>> stream fine again with no problems.
>>>>>
>>>>> Do you have any idea what might be causing this, or anything I should
>>>>> do to investigate further?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Josh
>>>>>
>>>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi Steffen,
>>>>>>
>>>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>>>> for noticing this!).
>>>>>> The Flink community is going to release 1.1.3 asap, which will
>>>>>> include the fix.
>>>>>> If you don’t want to wait for the release and want to try the fix
>>>>>> now, you can also build on the current “release-1.1” branch, which already
>>>>>> has FLINK-4514 merged.
>>>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>>>> problems afterwards.
>>>>>>
>>>>>> Best Regards,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>>>> steffen@hausmann-family.de) wrote:
>>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>>>> from a Kinesis stream. However, after a while (the exact duration
>>>>>> varies
>>>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>>>> further events and hence Flink doesn't produce any further output.
>>>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>>>> causing the entire job to fail:
>>>>>>
>>>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>>>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>>>
>>>>>> This seems to be related to FLINK-4514, which is marked as resovled
>>>>>> for
>>>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job
>>>>>> I'm
>>>>>> running isn't suspended but hangs just a few minutes after the job has
>>>>>> been started.
>>>>>>
>>>>>> I've attached a log file showing the described behavior.
>>>>>>
>>>>>> Any idea what may be wrong?
>>>>>>
>>>>>> Thanks,
>>>>>> Steffen
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Josh <jo...@gmail.com>.
Hi Scott & Stephan,

The problem has happened a couple more times since yesterday, it's very
strange as my job was running fine for over a week before this started
happening. I find that if I restart the job (and restore from the last
checkpoint) it runs fine for a while (couple of hours) before breaking
again.

@Scott thanks, I'll try testing with the upgraded versions, though since my
job was running fine for over a week it feels like there might be something
else going on here.

@Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka
cluster and CPU and memory usage seems normal on both nodes. I can't see
anything bad in the task manager logs relating to the Kafka producer
either. I do have a fairly large state (20GB) but I'm using the latest
RocksDB state backend (with asynchronous checkpointing). I'm not sure what
else I can do to investigate this, but let me know if you have any more
ideas!

Thanks,
Josh


On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:

> Is it possible that you have stalls in your topology?
>
> Reasons could be:
>
>   - The data sink blocks or becomes slow for some periods (where are you
> sending the data to?)
>
>   - If you are using large state and a state backend that only supports
> synchronous checkpointing, there may be a delay introduced by the checkpoint
>
>
> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <ki...@gmail.com>
> wrote:
>
>> Hi Steffan & Josh,
>>
>> For what it's worth, I've been using the Kinesis connector with very good
>> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
>> and AWS SDK dependencies to the following versions:
>>
>> aws.sdk.version: 1.11.34
>> aws.kinesis-kcl.version: 1.7.0
>>
>> My customizations are visible in this commit on my fork:
>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>> cb4d37518859e159b32
>>
>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>> the problem persists.
>>
>> Best,
>>
>> --Scott Kidder
>>
>>
>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi Gordon,
>>>
>>> Thanks for the fast reply!
>>> You're right about the expired iterator exception occurring just before
>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>> has been <15% the whole time when the spikes were taking place and I can't
>>> see anything unusual in the task manager logs.
>>>
>>> But actually I just noticed that the Flink UI showed no successful
>>> checkpoints during the time of the problem even though my checkpoint
>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>> can't find anything useful in the logs so not sure what happened!
>>>
>>> Josh
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> That warning message was added as part of FLINK-4514. It pops out
>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>> Kinesis.
>>>> The only time spent between after a shard iterator was returned and
>>>> before it was used to fetch the next batch of records, is on deserializing
>>>> and emitting of the records of the last fetched batch.
>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>> this normally shouldn’t happen.
>>>>
>>>> Have you noticed any sign of long, constant full GC for your Flink task
>>>> managers? From your description and check in code, the only possible guess
>>>> I can come up with now is that
>>>> the source tasks completely seized to be running for a period of time,
>>>> and when it came back, the shard iterator was unexpectedly found to be
>>>> expired. According to the graph you attached,
>>>> when the iterator was refreshed and tasks successfully fetched a few
>>>> more batches, the source tasks again halted, and so on.
>>>> So you should see that same warning message right before every small
>>>> peak within the graph.
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:
>>>>
>>>> Hey Gordon,
>>>>
>>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>>> with no problems, but yesterday the Kinesis consumer started behaving
>>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>>> time (see the spikes in graph attached which shows data consumed by the
>>>> Flink Kinesis consumer)
>>>>
>>>> Looking in the task manager logs, there are no exceptions however there
>>>> is this log message which I believe is related to the problem:
>>>>
>>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>>> {StartingSequenceNumber: 495665429169236488921642479266
>>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>>
>>>> Having restarted the job from my last savepoint, it's consuming the
>>>> stream fine again with no problems.
>>>>
>>>> Do you have any idea what might be causing this, or anything I should
>>>> do to investigate further?
>>>>
>>>> Cheers,
>>>>
>>>> Josh
>>>>
>>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi Steffen,
>>>>>
>>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>>> for noticing this!).
>>>>> The Flink community is going to release 1.1.3 asap, which will include
>>>>> the fix.
>>>>> If you don’t want to wait for the release and want to try the fix now,
>>>>> you can also build on the current “release-1.1” branch, which already has
>>>>> FLINK-4514 merged.
>>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>>> problems afterwards.
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>>> steffen@hausmann-family.de) wrote:
>>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>>> from a Kinesis stream. However, after a while (the exact duration
>>>>> varies
>>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>>> further events and hence Flink doesn't produce any further output.
>>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>>> causing the entire job to fail:
>>>>>
>>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>>
>>>>> This seems to be related to FLINK-4514, which is marked as resovled for
>>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>>>>> running isn't suspended but hangs just a few minutes after the job has
>>>>> been started.
>>>>>
>>>>> I've attached a log file showing the described behavior.
>>>>>
>>>>> Any idea what may be wrong?
>>>>>
>>>>> Thanks,
>>>>> Steffen
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Stephan Ewen <se...@apache.org>.
Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you
sending the data to?)

  - If you are using large state and a state backend that only supports
synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <ki...@gmail.com> wrote:

> Hi Steffan & Josh,
>
> For what it's worth, I've been using the Kinesis connector with very good
> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
> and AWS SDK dependencies to the following versions:
>
> aws.sdk.version: 1.11.34
> aws.kinesis-kcl.version: 1.7.0
>
> My customizations are visible in this commit on my fork:
> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518
> 859e159b32
>
> It might be worth testing with newer AWS SDK & KCL libraries to see if the
> problem persists.
>
> Best,
>
> --Scott Kidder
>
>
> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi Gordon,
>>
>> Thanks for the fast reply!
>> You're right about the expired iterator exception occurring just before
>> each spike. I can't see any signs of long GC on the task managers... CPU
>> has been <15% the whole time when the spikes were taking place and I can't
>> see anything unusual in the task manager logs.
>>
>> But actually I just noticed that the Flink UI showed no successful
>> checkpoints during the time of the problem even though my checkpoint
>> interval is 15 minutes. So I guess this is probably some kind of Flink
>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>> can't find anything useful in the logs so not sure what happened!
>>
>> Josh
>>
>>
>>
>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi Josh,
>>>
>>> That warning message was added as part of FLINK-4514. It pops out
>>> whenever a shard iterator was used after 5 minutes it was returned from
>>> Kinesis.
>>> The only time spent between after a shard iterator was returned and
>>> before it was used to fetch the next batch of records, is on deserializing
>>> and emitting of the records of the last fetched batch.
>>> So unless processing of the last fetched batch took over 5 minutes, this
>>> normally shouldn’t happen.
>>>
>>> Have you noticed any sign of long, constant full GC for your Flink task
>>> managers? From your description and check in code, the only possible guess
>>> I can come up with now is that
>>> the source tasks completely seized to be running for a period of time,
>>> and when it came back, the shard iterator was unexpectedly found to be
>>> expired. According to the graph you attached,
>>> when the iterator was refreshed and tasks successfully fetched a few
>>> more batches, the source tasks again halted, and so on.
>>> So you should see that same warning message right before every small
>>> peak within the graph.
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:
>>>
>>> Hey Gordon,
>>>
>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>> with no problems, but yesterday the Kinesis consumer started behaving
>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>> time (see the spikes in graph attached which shows data consumed by the
>>> Flink Kinesis consumer)
>>>
>>> Looking in the task manager logs, there are no exceptions however there
>>> is this log message which I believe is related to the problem:
>>>
>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>> {StartingSequenceNumber: 495665429169236488921642479266
>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>
>>> Having restarted the job from my last savepoint, it's consuming the
>>> stream fine again with no problems.
>>>
>>> Do you have any idea what might be causing this, or anything I should do
>>> to investigate further?
>>>
>>> Cheers,
>>>
>>> Josh
>>>
>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>>> > wrote:
>>>
>>>> Hi Steffen,
>>>>
>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>> for noticing this!).
>>>> The Flink community is going to release 1.1.3 asap, which will include
>>>> the fix.
>>>> If you don’t want to wait for the release and want to try the fix now,
>>>> you can also build on the current “release-1.1” branch, which already has
>>>> FLINK-4514 merged.
>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>> problems afterwards.
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>> steffen@hausmann-family.de) wrote:
>>>>
>>>> Hi there,
>>>>
>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>> from a Kinesis stream. However, after a while (the exact duration varies
>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>> further events and hence Flink doesn't produce any further output.
>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>> causing the entire job to fail:
>>>>
>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>
>>>> This seems to be related to FLINK-4514, which is marked as resovled for
>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>>>> running isn't suspended but hangs just a few minutes after the job has
>>>> been started.
>>>>
>>>> I've attached a log file showing the described behavior.
>>>>
>>>> Any idea what may be wrong?
>>>>
>>>> Thanks,
>>>> Steffen
>>>>
>>>>
>>>
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Scott Kidder <ki...@gmail.com>.
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good
results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:
https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518859e159b32

It might be worth testing with newer AWS SDK & KCL libraries to see if the
problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <jo...@gmail.com> wrote:

> Hi Gordon,
>
> Thanks for the fast reply!
> You're right about the expired iterator exception occurring just before
> each spike. I can't see any signs of long GC on the task managers... CPU
> has been <15% the whole time when the spikes were taking place and I can't
> see anything unusual in the task manager logs.
>
> But actually I just noticed that the Flink UI showed no successful
> checkpoints during the time of the problem even though my checkpoint
> interval is 15 minutes. So I guess this is probably some kind of Flink
> problem rather than a problem with the Kinesis consumer. Unfortunately I
> can't find anything useful in the logs so not sure what happened!
>
> Josh
>
>
>
> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Josh,
>>
>> That warning message was added as part of FLINK-4514. It pops out
>> whenever a shard iterator was used after 5 minutes it was returned from
>> Kinesis.
>> The only time spent between after a shard iterator was returned and
>> before it was used to fetch the next batch of records, is on deserializing
>> and emitting of the records of the last fetched batch.
>> So unless processing of the last fetched batch took over 5 minutes, this
>> normally shouldn’t happen.
>>
>> Have you noticed any sign of long, constant full GC for your Flink task
>> managers? From your description and check in code, the only possible guess
>> I can come up with now is that
>> the source tasks completely seized to be running for a period of time,
>> and when it came back, the shard iterator was unexpectedly found to be
>> expired. According to the graph you attached,
>> when the iterator was refreshed and tasks successfully fetched a few more
>> batches, the source tasks again halted, and so on.
>> So you should see that same warning message right before every small peak
>> within the graph.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:
>>
>> Hey Gordon,
>>
>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>> with no problems, but yesterday the Kinesis consumer started behaving
>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>> however the Flink Kinesis consumer started to stop consuming for periods of
>> time (see the spikes in graph attached which shows data consumed by the
>> Flink Kinesis consumer)
>>
>> Looking in the task manager logs, there are no exceptions however there
>> is this log message which I believe is related to the problem:
>>
>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmu
>> wY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex
>> 3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard
>> KinesisStreamShard{streamName='stream001', shard='{ShardId:
>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>> 85070511730234615865841151857942042863},SequenceNumberRange:
>> {StartingSequenceNumber: 495665429169236488921642479266
>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>
>> Having restarted the job from my last savepoint, it's consuming the
>> stream fine again with no problems.
>>
>> Do you have any idea what might be causing this, or anything I should do
>> to investigate further?
>>
>> Cheers,
>>
>> Josh
>>
>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Steffen,
>>>
>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>>> noticing this!).
>>> The Flink community is going to release 1.1.3 asap, which will include
>>> the fix.
>>> If you don’t want to wait for the release and want to try the fix now,
>>> you can also build on the current “release-1.1” branch, which already has
>>> FLINK-4514 merged.
>>> Sorry for the inconvenience. Let me know if you bump into any other
>>> problems afterwards.
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>> steffen@hausmann-family.de) wrote:
>>>
>>> Hi there,
>>>
>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>> from a Kinesis stream. However, after a while (the exact duration varies
>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>> further events and hence Flink doesn't produce any further output.
>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>> causing the entire job to fail:
>>>
>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>
>>> This seems to be related to FLINK-4514, which is marked as resovled for
>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>>> running isn't suspended but hangs just a few minutes after the job has
>>> been started.
>>>
>>> I've attached a log file showing the described behavior.
>>>
>>> Any idea what may be wrong?
>>>
>>> Thanks,
>>> Steffen
>>>
>>>
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Josh <jo...@gmail.com>.
Hi Gordon,

Thanks for the fast reply!
You're right about the expired iterator exception occurring just before
each spike. I can't see any signs of long GC on the task managers... CPU
has been <15% the whole time when the spikes were taking place and I can't
see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful
checkpoints during the time of the problem even though my checkpoint
interval is 15 minutes. So I guess this is probably some kind of Flink
problem rather than a problem with the Kinesis consumer. Unfortunately I
can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Josh,
>
> That warning message was added as part of FLINK-4514. It pops out whenever
> a shard iterator was used after 5 minutes it was returned from Kinesis.
> The only time spent between after a shard iterator was returned and before
> it was used to fetch the next batch of records, is on deserializing and
> emitting of the records of the last fetched batch.
> So unless processing of the last fetched batch took over 5 minutes, this
> normally shouldn’t happen.
>
> Have you noticed any sign of long, constant full GC for your Flink task
> managers? From your description and check in code, the only possible guess
> I can come up with now is that
> the source tasks completely seized to be running for a period of time, and
> when it came back, the shard iterator was unexpectedly found to be expired.
> According to the graph you attached,
> when the iterator was refreshed and tasks successfully fetched a few more
> batches, the source tasks again halted, and so on.
> So you should see that same warning message right before every small peak
> within the graph.
>
> Best Regards,
> Gordon
>
>
> On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:
>
> Hey Gordon,
>
> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
> with no problems, but yesterday the Kinesis consumer started behaving
> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
> however the Flink Kinesis consumer started to stop consuming for periods of
> time (see the spikes in graph attached which shows data consumed by the
> Flink Kinesis consumer)
>
> Looking in the task manager logs, there are no exceptions however there is
> this log message which I believe is related to the problem:
>
> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.
> connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/
> tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/
> EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//
> Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for
> shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 85070511730234615865841151857942042863},SequenceNumberRange:
> {StartingSequenceNumber: 495665429169236488921642479266
> 79091159472198219567464450,}}'}; refreshing the iterator ...
>
> Having restarted the job from my last savepoint, it's consuming the stream
> fine again with no problems.
>
> Do you have any idea what might be causing this, or anything I should do
> to investigate further?
>
> Cheers,
>
> Josh
>
> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Steffen,
>>
>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>> noticing this!).
>> The Flink community is going to release 1.1.3 asap, which will include
>> the fix.
>> If you don’t want to wait for the release and want to try the fix now,
>> you can also build on the current “release-1.1” branch, which already has
>> FLINK-4514 merged.
>> Sorry for the inconvenience. Let me know if you bump into any other
>> problems afterwards.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>> steffen@hausmann-family.de) wrote:
>>
>> Hi there,
>>
>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>> from a Kinesis stream. However, after a while (the exact duration varies
>> and is in the order of minutes) the Kinesis source doesn't emit any
>> further events and hence Flink doesn't produce any further output.
>> Eventually, an ExpiredIteratorException occurs in one of the task,
>> causing the entire job to fail:
>>
>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>> the future than the tolerated delay of 300000 milliseconds. (Service:
>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>
>> This seems to be related to FLINK-4514, which is marked as resovled for
>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>> running isn't suspended but hangs just a few minutes after the job has
>> been started.
>>
>> I've attached a log file showing the described behavior.
>>
>> Any idea what may be wrong?
>>
>> Thanks,
>> Steffen
>>
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh (jofo90@gmail.com) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:
2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (steffen@hausmann-family.de) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen


Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by Josh <jo...@gmail.com>.
Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with
no problems, but yesterday the Kinesis consumer started behaving
strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
however the Flink Kinesis consumer started to stop consuming for periods of
time (see the spikes in graph attached which shows data consumed by the
Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is
this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN
 org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  -
Encountered an unexpected expired iterator
AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
85070511730234615865841151857942042863},SequenceNumberRange:
{StartingSequenceNumber:
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing
the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream
fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to
investigate further?

Cheers,

Josh

On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Steffen,
>
> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
> noticing this!).
> The Flink community is going to release 1.1.3 asap, which will include the
> fix.
> If you don’t want to wait for the release and want to try the fix now, you
> can also build on the current “release-1.1” branch, which already has
> FLINK-4514 merged.
> Sorry for the inconvenience. Let me know if you bump into any other
> problems afterwards.
>
> Best Regards,
> Gordon
>
>
> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
> steffen@hausmann-family.de) wrote:
>
> Hi there,
>
> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
> from a Kinesis stream. However, after a while (the exact duration varies
> and is in the order of minutes) the Kinesis source doesn't emit any
> further events and hence Flink doesn't produce any further output.
> Eventually, an ExpiredIteratorException occurs in one of the task,
> causing the entire job to fail:
>
> > com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016
> while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the
> future than the tolerated delay of 300000 milliseconds. (Service:
> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>
> This seems to be related to FLINK-4514, which is marked as resovled for
> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
> running isn't suspended but hangs just a few minutes after the job has
> been started.
>
> I've attached a log file showing the described behavior.
>
> Any idea what may be wrong?
>
> Thanks,
> Steffen
>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (steffen@hausmann-family.de) wrote:

Hi there,  

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events  
from a Kinesis stream. However, after a while (the exact duration varies  
and is in the order of minutes) the Kinesis source doesn't emit any  
further events and hence Flink doesn't produce any further output.  
Eventually, an ExpiredIteratorException occurs in one of the task,  
causing the entire job to fail:  

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)  

This seems to be related to FLINK-4514, which is marked as resovled for  
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm  
running isn't suspended but hangs just a few minutes after the job has  
been started.  

I've attached a log file showing the described behavior.  

Any idea what may be wrong?  

Thanks,  
Steffen