You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2016/08/18 10:28:20 UTC

[jira] [Comment Edited] (FLINK-4341) Kinesis connector does not emit maximum watermark properly

    [ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426232#comment-15426232 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4341 at 8/18/16 10:28 AM:
----------------------------------------------------------------------

I can take a look at fixing this after I get back from vacation this week :) Thanks [~aljoscha] for pointing this out.

One question, what will be a reasonable way to fix this? The rework in 17dfd68d05be991b58ffe4b9252b09ca61ec8f05 took away the {{Long.MAX_VALUE}} because parallel source instances that initially did not have shards assigned to it may still have shards due to resharding of Kinesis streams in the future. I'm working on Kafka repartitioning too for our Kafka connectors, and we'll likely need to deal with this there too.


was (Author: tzulitai):
I can take a look at fixing this after I get back from vacation this week :) Thanks [~aljoscha] for pointing this out.

One question, what will be a reasonable way to fix this? The rework in 17dfd68d05be991b58ffe4b9252b09ca61ec8f05 took away the {{Long.MAX_VALUE}} because parallel source instances that initially did not have shards assigned to it may still have shards due to resharding of Kinesis streams in the future. I'm working on Kafka repartitioning too for our connectors, and we'll likely need to deal with this there too.

> Kinesis connector does not emit maximum watermark properly
> ----------------------------------------------------------
>
>                 Key: FLINK-4341
>                 URL: https://issues.apache.org/jira/browse/FLINK-4341
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0, 1.1.1
>            Reporter: Scott Kidder
>            Assignee: Robert Metzger
>            Priority: Blocker
>             Fix For: 1.2.0, 1.1.2
>
>
> **Prevously reported as "Checkpoint state size grows unbounded when task parallelism not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I was previously using a 1.1.0 snapshot (commit 18995c8) which performed as expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, providing a total of 4 slots.  When running the application with a parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for subsequent tasks that process the Kinesis stream data. I use an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint states were growing unbounded when running with a parallelism of 4, checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 2    20.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Restoring checkpointed state to task Fold: property_id, player -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter            - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I suspect there was a regression introduced relating to assumptions about the number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a value ranging from 1-4). This is currently preventing me from using all available Task Manager slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)