You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josson Paul <jo...@gmail.com> on 2020/10/23 05:36:21 UTC

Re: Flink 1.8.3 GC issues

@Piotr Nowojski <pn...@apache.org>  @Nico Kruber <nk...@apache.org>

An update.

I am able to figure out the problem code. A change in the Apache Beam code
is causing this problem.





Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
the Flink’s check point lock. Now the same lock is used by Flink’s timer
service to emit the Watermarks. Flink’s timer service is starved to get
hold of the lock and for some reason it never gets that lock. Aftereffect
 of this situation is that the ‘WaterMark’ is never emitted by Flink’s
timer service.  Because there is no Watermarks flowing through the system,
Sliding Windows are never closed. Data gets accumulated in the Window.



This problem occurs only if we have external lookup calls (like Redis)
happen before the data goes to Sliding Window. Something like below.



KafkaSource à Transforms (Occasional Redis
lookup)->SlidingWindow->Transforms->Kafka Sink





https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
. This is Beam 2.4 and you can see that there is no synchronized block at
line 257 and 270.



https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
. This is Beam 2.15. See the synchronized block introduced in line 264 and
280. We are using Beam 2.15 and Flink 1.8.



Beam introduced this synchronized block because of this bug.
https://issues.apache.org/jira/browse/BEAM-3087



After I removed that synchronized keyword everything started working fine
in my application.



What do you guys think about this?. Why does Beam need a Synchronized block
there?



Beam is using this lock ->

https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282



Thanks,

Josson

On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Josson,
>
> The TM logs that you attached are only from a 5 minutes time period. Are
> you sure they are encompassing the period before the potential failure and
> after the potential failure? It would be also nice if you would provide the
> logs matching to the charts (like the one you were providing in the
> previous messages), to correlate events (spike in latency/GC with some
> timestamp from the logs).
>
> I was not asking necessarily to upgrade to Java9, but an updated/bug fixed
> version of Java8 [1].
>
> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
> out of memory whatever be the ingestion rate. our Windows are 5
> minutes windows.
> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
> Full GC doesn't reclaim space.
>
> In both cases there is the same mechanism for the backpressure. If a
> task's output runs out of buffers to put produced records, it will block
> the task. It can be that between 1.4 and 1.8, with credit based flow
> control changes, the amount of available buffers for the tasks on your
> setup has grown, so the tasks are backpressuring later. This in turn can
> sometimes mean that at any point of time there is more data buffered on the
> operator's state, like `WindowOperator`. I'm not sure what's the
> best/easiest way how to check this:
>
> 1. the amount of buffered data might be visible via metrics [2][3]
> 2. if you enable DEBUG logs, it should be visible via:
>
> > LOG.debug("Using a local buffer pool with {}-{} buffers",
> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>
> entry logged by
> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>
> Piotrek
>
> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>
> pon., 14 wrz 2020 o 05:04 Josson Paul <jo...@gmail.com> napisał(a):
>
>> @Piotr Nowojski <pn...@apache.org> @Nico Kruber <nk...@apache.org>
>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>
>> I don't see any issues in the TM logs.
>> Tried to upgrade to Java 9. Flink is on top of another platform which
>> threw errors while upgrading to Java 9. I can't do much for now. We will
>> upgrade to Jdk 11 in another 2 months.
>>
>> Regarding the Heap size. The new experiment I did was on 4gb Heap on both
>> Flink 1.4 and Flink 1.8.
>>
>> Questions I am trying to get answered are
>>
>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
>> out of memory whatever be the ingestion rate. our Windows are 5
>> minutes windows.
>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>> Full GC doesn't reclaim space.
>>
>>
>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>
>>> Hi Josson,
>>>
>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in
>>> non-heap memory, just about when the problems started happening. Maybe you
>>> could post the TM logs?
>>> Have you tried updating JVM to a newer version?
>>> Also it looks like the heap size is the same between 1.4 and 1.8, but in
>>> an earlier message you said you increased it by 700MB?
>>>
>>> Piotrek
>>>
>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jo...@gmail.com> napisał(a):
>>>
>>>> I have attached two word documents.
>>>> Flink1.4 and Flink1.8
>>>> I reduced the heap size in the cluster and tried the experiment in both
>>>> Flink 1.4 and Flink 1.8.
>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going
>>>> into the details here).
>>>>
>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1
>>>> hour. You can see the details in the attached Flink1.4 document file. You
>>>> can see the GC activity and Cpu. Both are holding good.
>>>>
>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
>>>> happening. Issue started within 15 minutes of starting the ingestion. @Piotr
>>>> Nowojski <pn...@apache.org> , you can see that there is no meta
>>>> space related issue. All the GC related details are available in the doc.
>>>>
>>>> Especially see the difference in Heap dump of 'Biggest Objects' in both
>>>> clusters. How Flink 1.4 holds lesser objects in Heap. Is it because Flink
>>>> 1.4 was efficient and 1.8 solved that in efficiency and this problem is
>>>> expected?.
>>>>
>>>> @Nicko, We are not doing the fat jar stuff.
>>>>
>>>> @Piotr Nowojski <pn...@apache.org> , we are in the process of
>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months.
>>>>
>>>>
>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe
>>>> it was happening only 1 or 2 times.
>>>>
>>>> Please let me know if you need additional input
>>>>
>>>>
>>>> Thanks,
>>>> Josson
>>>>
>>>>
>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nk...@apache.org> wrote:
>>>>
>>>>> What looks a bit strange to me is that with a running job, the
>>>>> SystemProcessingTimeService should actually not be collected (since it
>>>>> is
>>>>> still in use)!
>>>>>
>>>>> My guess is that something is indeed happening during that time frame
>>>>> (maybe
>>>>> job restarts?) and I would propose to check your logs for anything
>>>>> suspicious
>>>>> in there.
>>>>>
>>>>>
>>>>> When I did experiments with Beam pipelines on our platform [1], I also
>>>>> noticed, that the standard fat jars that Beam creates include Flink
>>>>> runtime
>>>>> classes it shouldn't (at least if you are submitting to a separate
>>>>> Flink
>>>>> cluster). This can cause all sorts of problems and I would recommend
>>>>> removing
>>>>> those from the fat jar as documented in [1].
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Nico
>>>>>
>>>>>
>>>>>
>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099
>>>>>
>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
>>>>> > Hi Josson,
>>>>> >
>>>>> > Thanks again for the detailed answer, and sorry that I can not help
>>>>> you
>>>>> > with some immediate answer. I presume that jvm args for 1.8 are the
>>>>> same?
>>>>> >
>>>>> > Can you maybe post what exactly has crashed in your cases a) and b)?
>>>>> > Re c), in the previously attached word document, it looks like Flink
>>>>> was
>>>>> > running without problems for a couple of hours/minutes, everything
>>>>> was
>>>>> > stable, no signs of growing memory consumption, impending problem,
>>>>> until
>>>>> > around 23:15, when the problem started, right? Has something else
>>>>> happened
>>>>> > at that time, something that could explain the spike? A checkpoint?
>>>>> Job
>>>>> > crash/restart? Load spike?
>>>>> >
>>>>> > A couple of other random guesses:
>>>>> > - have you been monitoring other memory pools for Flink 1.4 and 1.8?
>>>>> Like
>>>>> > meta space? Growing meta space size can sometimes cause problems. It
>>>>> > shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
>>>>> but it
>>>>> > might be still worth checking...
>>>>> > - another random idea, have you tried upgrading JDK? Maybe that
>>>>> would solve
>>>>> > the problem?
>>>>> >
>>>>> > Best regards,
>>>>> > Piotrek
>>>>> >
>>>>> > śr., 9 wrz 2020 o 19:53 Josson Paul <jo...@gmail.com>
>>>>> napisał(a):
>>>>> > > Hi Piotr,
>>>>> > >
>>>>> > >  *JVM start up for Flink 1.4*
>>>>> > >
>>>>> > > *-------------------------------*
>>>>> > >
>>>>> > >
>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
>>>>> > > skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
>>>>> > > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
>>>>> > > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
>>>>> > >
>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
>>>>> > > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
>>>>> > > %p*-Dio.netty.eventLoopThreads=3*
>>>>> > >
>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
>>>>> > >
>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
>>>>> > > orkaddress.cache.ttl=120-Dnum.cores=3-
>>>>> > >
>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
>>>>> > > allelism=3-XX:ConcGCThreads=4 *
>>>>> > >
>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
>>>>> > >
>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
>>>>> > >
>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
>>>>> > >
>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
>>>>> > >
>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
>>>>> > > aglev.MaglevServerstartmaglev>
>>>>> > >    1.   taskmanager.memory.fraction = 0.7f (This was coming to 4.5
>>>>> GB. I
>>>>> > >    didn't know at that time that we could set memory fraction to
>>>>> zero
>>>>> > >    because
>>>>> > >    ours is a streaming job. It was  picking up the default )
>>>>> > >    2.    Network buffer pool memory was 646MB on the Heap (I think
>>>>> this
>>>>> > >    was the default based on some calculations in the Flink 1.4)
>>>>> > >    3.    G1GC region size was 4MB (Default)
>>>>> > >
>>>>> > > I tested this setup by reducing the JVM heap by *1GB.* It still
>>>>> worked
>>>>> > > perfectly with some lags here and there.
>>>>> > >
>>>>> > > *JVM start up for Flink 1.8*
>>>>> > > *------------------------------------*
>>>>> > > a) I started with the same configuration as above. Kubenetis POD
>>>>> went out
>>>>> > > of memory. At this point I realized that in Flink 1.8  network
>>>>> buffer
>>>>> > > pools
>>>>> > > are moved to native memory. Based on calculations it was coming to
>>>>> 200MB
>>>>> > > in
>>>>> > > native  memory. I increased the overall POD memory to accommodate
>>>>> the
>>>>> > > buffer pool change keeping the *heap the same*.
>>>>> > >
>>>>> > > b) Even after I modified the overall POD memory,  the POD still
>>>>> crashed.
>>>>> > > At this point I generated Flame graphs to identify the CPU/Malloc
>>>>> calls
>>>>> > > (Attached as part of the initial email). Realized that cpu usage
>>>>> of G1GC
>>>>> > > is
>>>>> > > significantly different from Flink 1.4. Now I made 2 changes
>>>>> > >
>>>>> > >    1.  taskmanager.memory.fraction = 0.01f (This will give more
>>>>> heap for
>>>>> > >    user code)
>>>>> > >    2. Increased cpu from 3 to 4 cores.
>>>>> > >
>>>>> > >         Above changes helped to hold the cluster a little longer.
>>>>> But it
>>>>> > >
>>>>> > > still crashed after sometime.
>>>>> > >
>>>>> > > c)  Now I made the below changes.
>>>>> > >
>>>>> > >    1. I came across this ->
>>>>> > >
>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
>>>>> > >    622.html . Now I changed the G1GC region space to *8MB *instead
>>>>> of the
>>>>> > >    default 4MB*.*
>>>>> > >    2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
>>>>> experiments)
>>>>> > >    3. Played around with G1RSetSparseRegionEntries
>>>>> > >
>>>>> > >        This helped to avoid the POD going out of memory. But the
>>>>> Old Gen
>>>>> > >
>>>>> > > heap issue was very evident now (Please see the attached word
>>>>> document).
>>>>> > >
>>>>> > >  d)  Allocated additional heap memory of *700 MB *along with the
>>>>> above
>>>>> > >
>>>>> > > changes. This also didn't help. It just prolonged the crash.  Now
>>>>> I need
>>>>> > > help from others to which direction I want to take this to .
>>>>> > >
>>>>> > > My worry is even if I upgrade to flink 1.11 this issue might still
>>>>> > > persist.
>>>>> > >
>>>>> > > I have attached a screenshot from Heap dump to show you the
>>>>> difference
>>>>> > > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
>>>>> created. Not
>>>>> > > sure whether this change has something to do with this memory
>>>>> issue that I
>>>>> > > am facing.
>>>>> > > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
>>>>> > >
>>>>> > >
>>>>> > > Thanks,
>>>>> > > Josson
>>>>> > >
>>>>> > > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
>>>>> pnowojski@apache.org>
>>>>> > >
>>>>> > > wrote:
>>>>> > >> Hi Josson,
>>>>> > >>
>>>>> > >> Thanks for getting back.
>>>>> > >>
>>>>> > >> What are the JVM settings and in particular GC settings that you
>>>>> are
>>>>> > >> using (G1GC?)?
>>>>> > >> It could also be an issue that in 1.4 you were just slightly
>>>>> below the
>>>>> > >> threshold of GC issues, while in 1.8, something is using a bit
>>>>> more
>>>>> > >> memory,
>>>>> > >> causing the GC issues to appear? Have you tried just increasing
>>>>> the heap
>>>>> > >> size?
>>>>> > >> Have you tried to compare on the job start up, what is the usage
>>>>> and size
>>>>> > >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
>>>>> point us in
>>>>> > >> the right direction.
>>>>> > >>
>>>>> > >> > My understanding on back pressure is that it is not based on
>>>>> Heap
>>>>> > >>
>>>>> > >> memory but based on how fast the Network buffers are filled. Is
>>>>> this
>>>>> > >> correct?.
>>>>> > >>
>>>>> > >> > Does Flink use TCP connection to communicate between tasks if
>>>>> the tasks
>>>>> > >>
>>>>> > >> are in the same Task manager?.
>>>>> > >>
>>>>> > >> No, local input channels are being used then, but memory for
>>>>> network
>>>>> > >> buffers is assigned to tasks regardless of the fraction of local
>>>>> input
>>>>> > >> channels in the task. However with just single taskmanager and
>>>>> > >> parallelism
>>>>> > >> of 4, the amount of the memory used by the network stack should be
>>>>> > >> insignificant, at least as long as you have a reasonably sized
>>>>> job graph
>>>>> > >> (32KB * (2 * parallelism + 7) * number of tasks).
>>>>> > >>
>>>>> > >> > What I noticed in Flink 1.4 is that it doesn't read data from
>>>>> Kafka if
>>>>> > >>
>>>>> > >> there is not sufficient heap memory to process data. Somehow this
>>>>> is not
>>>>> > >> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>> get
>>>>> > >> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>> Flink 1.8.
>>>>> > >>
>>>>> > >> No, there were no changes in this part as far as I remember.
>>>>> Tasks when
>>>>> > >> producing records are serialising them and putting into the
>>>>> network
>>>>> > >> buffers. If there are no available network buffers, the task is
>>>>> back
>>>>> > >> pressuring and stops processing new records.
>>>>> > >>
>>>>> > >> Best regards,
>>>>> > >> Piotrek
>>>>> > >>
>>>>> > >> wt., 8 wrz 2020 o 21:51 Josson Paul <jo...@gmail.com>
>>>>> napisał(a):
>>>>> > >>> Hi Piotr,
>>>>> > >>>
>>>>> > >>>    2) SystemProcessingTimeService holds the
>>>>> HeapKeyedStateBackend and
>>>>> > >>>
>>>>> > >>> HeapKeyedStateBackend has lot of Objects and that is filling the
>>>>> Heap
>>>>> > >>>
>>>>> > >>>    3) I am not using Flink Kafka Connector. But we are using
>>>>> Apache Beam
>>>>> > >>>
>>>>> > >>> kafka connector.  There is a change in the Apache Beam version.
>>>>> But the
>>>>> > >>> kafka client we are using is the same as the one which was
>>>>> working in
>>>>> > >>> the
>>>>> > >>> other cluster where  Flink was 1.4.
>>>>> > >>>
>>>>> > >>>   *There is no change in Hardware/Java/Kafka/Kafka
>>>>> Client/Application
>>>>> > >>>
>>>>> > >>> between the cluster which is working and not working*
>>>>> > >>>
>>>>> > >>> I am aware of the memory changes and network buffer changes
>>>>> between 1.4
>>>>> > >>> and 1.8.
>>>>> > >>>
>>>>> > >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers
>>>>> are on the
>>>>> > >>> native memory. I modified the Flink 1.8 code to put it back to
>>>>> Heap
>>>>> > >>> memory
>>>>> > >>> but the issue didn't get resolved.
>>>>> > >>>
>>>>> > >>> Mine is a streaming job so we set 'taskmanager.memory.fraction'
>>>>> to very
>>>>> > >>> minimal and that heap is fully available for user data.
>>>>> > >>>
>>>>> > >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8
>>>>> uses
>>>>> > >>> Credit based Flow control. *Our set up has only 1 task manager
>>>>> and 4
>>>>> > >>> parallelisms*.  According to this video
>>>>> > >>>
>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
>>>>> > >>> *16:21*) if tasks are in same task manager,  Flink doesn't use
>>>>> Credit
>>>>> > >>> Based Flow control. Essentially no change between Flink 1.4 and
>>>>> 1.8 in
>>>>> > >>> *our
>>>>> > >>> set up*. Still I tried to change the Credit Based Flow Control
>>>>> to False
>>>>> > >>> and test my setup. The problem persists.
>>>>> > >>>
>>>>> > >>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>> Kafka if
>>>>> > >>> there is not sufficient heap memory to process data. Somehow
>>>>> this is not
>>>>> > >>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>> get
>>>>> > >>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>> Flink 1.8.
>>>>> > >>>
>>>>> > >>> My understanding on back pressure is that it is not based on
>>>>> Heap memory
>>>>> > >>> but based on how fast the Network buffers are filled. Is this
>>>>> correct?.
>>>>> > >>> Does Flink use TCP connection to communicate between tasks if
>>>>> the tasks
>>>>> > >>> are in the same Task manager?.
>>>>> > >>>
>>>>> > >>> Thanks,
>>>>> > >>> josson
>>>>> > >>>
>>>>> > >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
>>>>> pnowojski@apache.org>
>>>>> > >>>
>>>>> > >>> wrote:
>>>>> > >>>> Hi Josson,
>>>>> > >>>>
>>>>> > >>>> 2. Are you sure that all/vast majority of those objects are
>>>>> pointing
>>>>> > >>>> towards SystemProcessingTimeService? And is this really the
>>>>> problem of
>>>>> > >>>> those objects? Are they taking that much of the memory?
>>>>> > >>>> 3. It still could be Kafka's problem, as it's likely that
>>>>> between 1.4
>>>>> > >>>> and 1.8.x we bumped Kafka dependencies.
>>>>> > >>>>
>>>>> > >>>> Frankly if that's not some other external dependency issue, I
>>>>> would
>>>>> > >>>> expect that the problem might lie somewhere completely else.
>>>>> Flink's
>>>>> > >>>> code
>>>>> > >>>> relaying on the finalisation hasn't changed since 2015/2016. On
>>>>> the
>>>>> > >>>> other
>>>>> > >>>> hand there were quite a bit of changes between 1.4 and 1.8.x,
>>>>> some of
>>>>> > >>>> them
>>>>> > >>>> were affecting memory usage. Have you read release notes for
>>>>> versions
>>>>> > >>>> 1.5,
>>>>> > >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
>>>>> memory
>>>>> > >>>> related notes that could be addressed via configuration changes.
>>>>> > >>>>
>>>>> > >>>> Thanks,
>>>>> > >>>> Piotrek
>>>>> > >>>>
>>>>> > >>>> [1]
>>>>> > >>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
>>>>> > >>>> es/flink-1.5.html [2]
>>>>> > >>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
>>>>> > >>>> es/flink-1.8.html>>>>
>>>>> > >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jo...@gmail.com>
>>>>> napisał(a):
>>>>> > >>>>> 1) We are in the process of migrating to Flink 1.11. But it is
>>>>> going
>>>>> > >>>>> to take a while before we can make everything work with the
>>>>> latest
>>>>> > >>>>> version.
>>>>> > >>>>> Meanwhile since this is happening in production I am trying to
>>>>> solve
>>>>> > >>>>> this.
>>>>> > >>>>> 2) Finalizae class is pointing
>>>>> > >>>>> to
>>>>> > >>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
>>>>> > >>>>> .
>>>>> > >>>>> This class has a finalize method. I have attached spreadsheet (
>>>>> > >>>>> *Object-explorer.csv*) to give you a high level view
>>>>> > >>>>> 3) The difference between working cluster and NON working
>>>>> cluster is
>>>>> > >>>>> only on Beam and Flink. Hardware, Input message rate,
>>>>> Application
>>>>> > >>>>> jars,
>>>>> > >>>>> Kafka are all the same between those 2 clusters. Working
>>>>> cluster was
>>>>> > >>>>> with
>>>>> > >>>>> Flink 1.4 and Beam 2.4.0
>>>>> > >>>>>
>>>>> > >>>>> Any insights into this will help me to debug further
>>>>> > >>>>>
>>>>> > >>>>> Thanks,
>>>>> > >>>>> Josson
>>>>> > >>>>>
>>>>> > >>>>>
>>>>> > >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
>>>>> pnowojski@apache.org>
>>>>> > >>>>>
>>>>> > >>>>> wrote:
>>>>> > >>>>>> Hi,
>>>>> > >>>>>>
>>>>> > >>>>>> Have you tried using a more recent Flink version? 1.8.x is no
>>>>> longer
>>>>> > >>>>>> supported, and latest versions might not have this issue
>>>>> anymore.
>>>>> > >>>>>>
>>>>> > >>>>>> Secondly, have you tried backtracking those references to the
>>>>> > >>>>>> Finalizers? Assuming that Finalizer is indeed the class
>>>>> causing
>>>>> > >>>>>> problems.
>>>>> > >>>>>>
>>>>> > >>>>>> Also it may well be a non Flink issue [1].
>>>>> > >>>>>>
>>>>> > >>>>>> Best regards,
>>>>> > >>>>>> Piotrek
>>>>> > >>>>>>
>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>> > >>>>>>
>>>>> > >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jo...@gmail.com>
>>>>> > >>>>>>
>>>>> > >>>>>> napisał(a):
>>>>> > >>>>>>> Hi All,
>>>>> > >>>>>>>
>>>>> > >>>>>>> *ISSUE*
>>>>> > >>>>>>> ------
>>>>> > >>>>>>> Flink application runs for sometime and suddenly the CPU
>>>>> shoots up
>>>>> > >>>>>>> and touches the peak, POD memory reaches to the peak, GC
>>>>> count
>>>>> > >>>>>>> increases,
>>>>> > >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean up
>>>>> heap
>>>>> > >>>>>>> space. At
>>>>> > >>>>>>> this point I stopped sending the data and cancelled the
>>>>> Flink Jobs.
>>>>> > >>>>>>> Still
>>>>> > >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and
>>>>> can see
>>>>> > >>>>>>> that
>>>>> > >>>>>>> lot of Objects in the java.lang.Finalizer class. I have
>>>>> attached the
>>>>> > >>>>>>> details in a word document. I do have the heap dump but it
>>>>> is close
>>>>> > >>>>>>> to 2GB
>>>>> > >>>>>>> of compressed size. Is it safe to upload somewhere and share
>>>>> it
>>>>> > >>>>>>> here?.
>>>>> > >>>>>>>
>>>>> > >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam:
>>>>> release-2.4.0
>>>>> > >>>>>>>
>>>>> > >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: release-2.4.0)
>>>>> > >>>>>>> ----------------------------------------------------
>>>>> > >>>>>>>
>>>>> > >>>>>>> Application reads from Kafka and does aggregations and
>>>>> writes into
>>>>> > >>>>>>> Kafka. Application has 5 minutes windows. Application uses
>>>>> Beam
>>>>> > >>>>>>> constructs
>>>>> > >>>>>>> to build the pipeline. To read and write we use Beam
>>>>> connectors.
>>>>> > >>>>>>>
>>>>> > >>>>>>> Flink version: 1.4.0
>>>>> > >>>>>>> Beam version: release-2.4.0
>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>> pointing
>>>>> > >>>>>>> happening to the distributed File System.
>>>>> > >>>>>>>
>>>>> > >>>>>>> No of task Managers: 1
>>>>> > >>>>>>> Heap: 6.4 GB
>>>>> > >>>>>>> CPU: 4 Cores
>>>>> > >>>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>> > >>>>>>>
>>>>> > >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
>>>>> version:
>>>>> > >>>>>>> release-2.15.0)
>>>>> > >>>>>>> ----------
>>>>> > >>>>>>> Application details are same as above
>>>>> > >>>>>>>
>>>>> > >>>>>>> *No change in application and the rate at which data is
>>>>> injected.
>>>>> > >>>>>>> But change in Flink and Beam versions*
>>>>> > >>>>>>>
>>>>> > >>>>>>>
>>>>> > >>>>>>> Flink version: 1.8.3
>>>>> > >>>>>>> Beam version: release-2.15.0
>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>> pointing
>>>>> > >>>>>>> happening to the distributed File System.
>>>>> > >>>>>>>
>>>>> > >>>>>>> No of task Managers: 1
>>>>> > >>>>>>> Heap: 6.5 GB
>>>>> > >>>>>>> CPU: 4 Cores
>>>>> > >>>>>>>
>>>>> > >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes pod
>>>>> > >>>>>>>
>>>>> > >>>>>>> My Observations
>>>>> > >>>>>>> -------------
>>>>> > >>>>>>>
>>>>> > >>>>>>> 1) CPU flame graph shows that in the working version, the
>>>>> cpu time
>>>>> > >>>>>>> on GC is lesser compared to non-working version (Please see
>>>>> the
>>>>> > >>>>>>> attached
>>>>> > >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
>>>>> > >>>>>>> *CPU-flame-NOT-working.svg*)
>>>>> > >>>>>>>
>>>>> > >>>>>>> 2) I have attached the flame graph for native memory MALLOC
>>>>> calls
>>>>> > >>>>>>> when the issue was happening. Please find the attached SVG
>>>>> image (
>>>>> > >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this
>>>>> issue
>>>>> > >>>>>>> happens. For me, it looks like the GC process is requesting
>>>>> a lot of
>>>>> > >>>>>>> native
>>>>> > >>>>>>> memory.
>>>>> > >>>>>>>
>>>>> > >>>>>>> 3) When the issue is happening the GC cpu usage is very
>>>>> high. Please
>>>>> > >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>> > >>>>>>>
>>>>> > >>>>>>> Note: SVG file can be opened using any browser and it is
>>>>> clickable
>>>>> > >>>>>>> while opened.
>>>>> > >>>>>>> --
>>>>> > >>>>>>> Thanks
>>>>> > >>>>>>> Josson
>>>>> > >>>>>
>>>>> > >>>>> --
>>>>> > >>>>> Thanks
>>>>> > >>>>> Josson
>>>>> > >>>
>>>>> > >>> --
>>>>> > >>> Thanks
>>>>> > >>> Josson
>>>>> > >
>>>>> > > --
>>>>> > > Thanks
>>>>> > > Josson
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Josson
>>>>
>>>
>>
>> --
>> Thanks
>> Josson
>>
>

-- 
Thanks
Josson

Re: Flink 1.8.3 GC issues

Posted by Aljoscha Krettek <al...@apache.org>.
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251

On 11.11.20 19:09, Aljoscha Krettek wrote:
> Hi,
> 
> nice work on debugging this!
> 
> We need the synchronized block in the source because the call to 
> reader.advance() (via the invoker) and reader.getCurrent() (via 
> emitElement()) must be atomic with respect to state. We cannot advance 
> the reader state, not emit that record but still checkpoint the new 
> reader state. The monitor ensures that no checkpoint can happen in 
> between those to calls.
> 
> The basic problem is now that we starve checkpointing because the 
> monitor/lock is not fair. This could be solved by using a fair lock but 
> that would require Flink proper to be changed to use a fair lock instead 
> of a monitor/synchronized. I don't see this as an immediate solution.
> 
> One thing that exacerbates this problem is that too many things are 
> happening "under" the synchronized block. All the transforms before a 
> shuffle/rebalance/keyBy are chained to the source, which means that they 
> are invoked from the emitElement() call. You could see this by 
> printing/logging a stacktrace in your user function that does the Redis 
> lookups.
> 
> A possible mitigation would be to disable chaining globally by inserting 
> a `flinkStreamEnv.disableOperatorChaining()` in [1].
> 
> A more surgical version would be to only disable chaining for sources. 
> I'm attaching a patch for that in case you're willing to try it out. 
> This is for latest master but it's easy enough to apply manually.
> 
> Best,
> Aljoscha
> 
> [1] 
> https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225 
> 
> 
> On 23.10.20 09:47, Piotr Nowojski wrote:
>> Hi Josson,
>>
>> Thanks for great investigation and coming back to use. Aljoscha, could 
>> you
>> help us here? It looks like you were involved in this original BEAM-3087
>> issue.
>>
>> Best,
>> Piotrek
>>
>> pt., 23 paź 2020 o 07:36 Josson Paul <jo...@gmail.com> napisał(a):
>>
>>> @Piotr Nowojski <pn...@apache.org>  @Nico Kruber 
>>> <nk...@apache.org>
>>>
>>> An update.
>>>
>>> I am able to figure out the problem code. A change in the Apache Beam 
>>> code
>>> is causing this problem.
>>>
>>>
>>>
>>>
>>>
>>> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
>>> the Flink’s check point lock. Now the same lock is used by Flink’s timer
>>> service to emit the Watermarks. Flink’s timer service is starved to get
>>> hold of the lock and for some reason it never gets that lock. 
>>> Aftereffect
>>>   of this situation is that the ‘WaterMark’ is never emitted by Flink’s
>>> timer service.  Because there is no Watermarks flowing through the 
>>> system,
>>> Sliding Windows are never closed. Data gets accumulated in the Window.
>>>
>>>
>>>
>>> This problem occurs only if we have external lookup calls (like Redis)
>>> happen before the data goes to Sliding Window. Something like below.
>>>
>>>
>>>
>>> KafkaSource à Transforms (Occasional Redis
>>> lookup)->SlidingWindow->Transforms->Kafka Sink
>>>
>>>
>>>
>>>
>>>
>>>
>>> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 
>>>
>>> . This is Beam 2.4 and you can see that there is no synchronized 
>>> block at
>>> line 257 and 270.
>>>
>>>
>>>
>>>
>>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 
>>>
>>> . This is Beam 2.15. See the synchronized block introduced in line 
>>> 264 and
>>> 280. We are using Beam 2.15 and Flink 1.8.
>>>
>>>
>>>
>>> Beam introduced this synchronized block because of this bug.
>>> https://issues.apache.org/jira/browse/BEAM-3087
>>>
>>>
>>>
>>> After I removed that synchronized keyword everything started working 
>>> fine
>>> in my application.
>>>
>>>
>>>
>>> What do you guys think about this?. Why does Beam need a Synchronized
>>> block there?
>>>
>>>
>>>
>>> Beam is using this lock ->
>>>
>>>
>>> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282 
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Josson
>>>
>>> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>>> Hi Josson,
>>>>
>>>> The TM logs that you attached are only from a 5 minutes time period. 
>>>> Are
>>>> you sure they are encompassing the period before the potential 
>>>> failure and
>>>> after the potential failure? It would be also nice if you would 
>>>> provide the
>>>> logs matching to the charts (like the one you were providing in the
>>>> previous messages), to correlate events (spike in latency/GC with some
>>>> timestamp from the logs).
>>>>
>>>> I was not asking necessarily to upgrade to Java9, but an updated/bug
>>>> fixed version of Java8 [1].
>>>>
>>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never
>>>> goes out of memory whatever be the ingestion rate. our Windows are 5
>>>> minutes windows.
>>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed 
>>>> GC or
>>>> Full GC doesn't reclaim space.
>>>>
>>>> In both cases there is the same mechanism for the backpressure. If a
>>>> task's output runs out of buffers to put produced records, it will 
>>>> block
>>>> the task. It can be that between 1.4 and 1.8, with credit based flow
>>>> control changes, the amount of available buffers for the tasks on your
>>>> setup has grown, so the tasks are backpressuring later. This in turn 
>>>> can
>>>> sometimes mean that at any point of time there is more data buffered 
>>>> on the
>>>> operator's state, like `WindowOperator`. I'm not sure what's the
>>>> best/easiest way how to check this:
>>>>
>>>> 1. the amount of buffered data might be visible via metrics [2][3]
>>>> 2. if you enable DEBUG logs, it should be visible via:
>>>>
>>>>> LOG.debug("Using a local buffer pool with {}-{} buffers",
>>>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>>>>
>>>> entry logged by
>>>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>>>>
>>>> Piotrek
>>>>
>>>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network 
>>>>
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network 
>>>>
>>>>
>>>> pon., 14 wrz 2020 o 05:04 Josson Paul <jo...@gmail.com> 
>>>> napisał(a):
>>>>
>>>>> @Piotr Nowojski <pn...@apache.org> @Nico Kruber 
>>>>> <nk...@apache.org>
>>>>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>>>>
>>>>> I don't see any issues in the TM logs.
>>>>> Tried to upgrade to Java 9. Flink is on top of another platform which
>>>>> threw errors while upgrading to Java 9. I can't do much for now. We 
>>>>> will
>>>>> upgrade to Jdk 11 in another 2 months.
>>>>>
>>>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on
>>>>> both Flink 1.4 and Flink 1.8.
>>>>>
>>>>> Questions I am trying to get answered are
>>>>>
>>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never 
>>>>> goes
>>>>> out of memory whatever be the ingestion rate. our Windows are 5
>>>>> minutes windows.
>>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed 
>>>>> GC or
>>>>> Full GC doesn't reclaim space.
>>>>>
>>>>>
>>>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pn...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Josson,
>>>>>>
>>>>>> Have you checked the logs as Nico suggested? At 18:55 there is a 
>>>>>> dip in
>>>>>> non-heap memory, just about when the problems started happening. 
>>>>>> Maybe you
>>>>>> could post the TM logs?
>>>>>> Have you tried updating JVM to a newer version?
>>>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but
>>>>>> in an earlier message you said you increased it by 700MB?
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jo...@gmail.com> 
>>>>>> napisał(a):
>>>>>>
>>>>>>> I have attached two word documents.
>>>>>>> Flink1.4 and Flink1.8
>>>>>>> I reduced the heap size in the cluster and tried the experiment in
>>>>>>> both Flink 1.4 and Flink 1.8.
>>>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going
>>>>>>> into the details here).
>>>>>>>
>>>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster 
>>>>>>> for 1
>>>>>>> hour. You can see the details in the attached Flink1.4 document 
>>>>>>> file. You
>>>>>>> can see the GC activity and Cpu. Both are holding good.
>>>>>>>
>>>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
>>>>>>> happening. Issue started within 15 minutes of starting the 
>>>>>>> ingestion. @Piotr
>>>>>>> Nowojski <pn...@apache.org> , you can see that there is no meta
>>>>>>> space related issue. All the GC related details are available in 
>>>>>>> the doc.
>>>>>>>
>>>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in
>>>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it 
>>>>>>> because
>>>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and 
>>>>>>> this problem
>>>>>>> is expected?.
>>>>>>>
>>>>>>> @Nicko, We are not doing the fat jar stuff.
>>>>>>>
>>>>>>> @Piotr Nowojski <pn...@apache.org> , we are in the process of
>>>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months.
>>>>>>>
>>>>>>>
>>>>>>> I am not getting the Finalizer problem in the latest heap dump. 
>>>>>>> Maybe
>>>>>>> it was happening only 1 or 2 times.
>>>>>>>
>>>>>>> Please let me know if you need additional input
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Josson
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nk...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What looks a bit strange to me is that with a running job, the
>>>>>>>> SystemProcessingTimeService should actually not be collected (since
>>>>>>>> it is
>>>>>>>> still in use)!
>>>>>>>>
>>>>>>>> My guess is that something is indeed happening during that time 
>>>>>>>> frame
>>>>>>>> (maybe
>>>>>>>> job restarts?) and I would propose to check your logs for anything
>>>>>>>> suspicious
>>>>>>>> in there.
>>>>>>>>
>>>>>>>>
>>>>>>>> When I did experiments with Beam pipelines on our platform [1], I
>>>>>>>> also
>>>>>>>> noticed, that the standard fat jars that Beam creates include Flink
>>>>>>>> runtime
>>>>>>>> classes it shouldn't (at least if you are submitting to a separate
>>>>>>>> Flink
>>>>>>>> cluster). This can cause all sorts of problems and I would 
>>>>>>>> recommend
>>>>>>>> removing
>>>>>>>> those from the fat jar as documented in [1].
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Nico
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099
>>>>>>>>
>>>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
>>>>>>>>> Hi Josson,
>>>>>>>>>
>>>>>>>>> Thanks again for the detailed answer, and sorry that I can not 
>>>>>>>>> help
>>>>>>>> you
>>>>>>>>> with some immediate answer. I presume that jvm args for 1.8 are 
>>>>>>>>> the
>>>>>>>> same?
>>>>>>>>>
>>>>>>>>> Can you maybe post what exactly has crashed in your cases a) 
>>>>>>>>> and b)?
>>>>>>>>> Re c), in the previously attached word document, it looks like
>>>>>>>> Flink was
>>>>>>>>> running without problems for a couple of hours/minutes, everything
>>>>>>>> was
>>>>>>>>> stable, no signs of growing memory consumption, impending problem,
>>>>>>>> until
>>>>>>>>> around 23:15, when the problem started, right? Has something else
>>>>>>>> happened
>>>>>>>>> at that time, something that could explain the spike? A 
>>>>>>>>> checkpoint?
>>>>>>>> Job
>>>>>>>>> crash/restart? Load spike?
>>>>>>>>>
>>>>>>>>> A couple of other random guesses:
>>>>>>>>> - have you been monitoring other memory pools for Flink 1.4 and
>>>>>>>> 1.8? Like
>>>>>>>>> meta space? Growing meta space size can sometimes cause 
>>>>>>>>> problems. It
>>>>>>>>> shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
>>>>>>>> but it
>>>>>>>>> might be still worth checking...
>>>>>>>>> - another random idea, have you tried upgrading JDK? Maybe that
>>>>>>>> would solve
>>>>>>>>> the problem?
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Piotrek
>>>>>>>>>
>>>>>>>>> śr., 9 wrz 2020 o 19:53 Josson Paul <jo...@gmail.com>
>>>>>>>> napisał(a):
>>>>>>>>>> Hi Piotr,
>>>>>>>>>>
>>>>>>>>>>   *JVM start up for Flink 1.4*
>>>>>>>>>>
>>>>>>>>>> *-------------------------------*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta 
>>>>>>>>
>>>>>>>>>> skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
>>>>>>>>>> *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
>>>>>>>>>> -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4* 
>>>>>>>>
>>>>>>>>>> -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
>>>>>>>>>> %p*-Dio.netty.eventLoopThreads=3*
>>>>>>>>>>
>>>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo 
>>>>>>>>
>>>>>>>>>>
>>>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw 
>>>>>>>>
>>>>>>>>>> orkaddress.cache.ttl=120-Dnum.cores=3-
>>>>>>>>>>
>>>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par 
>>>>>>>>
>>>>>>>>>> allelism=3-XX:ConcGCThreads=4 *
>>>>>>>>>>
>>>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di 
>>>>>>>>
>>>>>>>>>>
>>>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di 
>>>>>>>>
>>>>>>>>>>
>>>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/ 
>>>>>>>>
>>>>>>>>>>
>>>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi 
>>>>>>>>
>>>>>>>>>>
>>>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m 
>>>>>>>>
>>>>>>>>>> aglev.MaglevServerstartmaglev>
>>>>>>>>>>     1.   taskmanager.memory.fraction = 0.7f (This was coming to
>>>>>>>> 4.5 GB. I
>>>>>>>>>>     didn't know at that time that we could set memory fraction to
>>>>>>>> zero
>>>>>>>>>>     because
>>>>>>>>>>     ours is a streaming job. It was  picking up the default )
>>>>>>>>>>     2.    Network buffer pool memory was 646MB on the Heap (I
>>>>>>>> think this
>>>>>>>>>>     was the default based on some calculations in the Flink 1.4)
>>>>>>>>>>     3.    G1GC region size was 4MB (Default)
>>>>>>>>>>
>>>>>>>>>> I tested this setup by reducing the JVM heap by *1GB.* It still
>>>>>>>> worked
>>>>>>>>>> perfectly with some lags here and there.
>>>>>>>>>>
>>>>>>>>>> *JVM start up for Flink 1.8*
>>>>>>>>>> *------------------------------------*
>>>>>>>>>> a) I started with the same configuration as above. Kubenetis POD
>>>>>>>> went out
>>>>>>>>>> of memory. At this point I realized that in Flink 1.8  network
>>>>>>>> buffer
>>>>>>>>>> pools
>>>>>>>>>> are moved to native memory. Based on calculations it was coming
>>>>>>>> to 200MB
>>>>>>>>>> in
>>>>>>>>>> native  memory. I increased the overall POD memory to accommodate
>>>>>>>> the
>>>>>>>>>> buffer pool change keeping the *heap the same*.
>>>>>>>>>>
>>>>>>>>>> b) Even after I modified the overall POD memory,  the POD still
>>>>>>>> crashed.
>>>>>>>>>> At this point I generated Flame graphs to identify the CPU/Malloc
>>>>>>>> calls
>>>>>>>>>> (Attached as part of the initial email). Realized that cpu usage
>>>>>>>> of G1GC
>>>>>>>>>> is
>>>>>>>>>> significantly different from Flink 1.4. Now I made 2 changes
>>>>>>>>>>
>>>>>>>>>>     1.  taskmanager.memory.fraction = 0.01f (This will give more
>>>>>>>> heap for
>>>>>>>>>>     user code)
>>>>>>>>>>     2. Increased cpu from 3 to 4 cores.
>>>>>>>>>>
>>>>>>>>>>          Above changes helped to hold the cluster a little 
>>>>>>>>>> longer.
>>>>>>>> But it
>>>>>>>>>>
>>>>>>>>>> still crashed after sometime.
>>>>>>>>>>
>>>>>>>>>> c)  Now I made the below changes.
>>>>>>>>>>
>>>>>>>>>>     1. I came across this ->
>>>>>>>>>>
>>>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002 
>>>>>>>>
>>>>>>>>>>     622.html . Now I changed the G1GC region space to *8MB
>>>>>>>> *instead of the
>>>>>>>>>>     default 4MB*.*
>>>>>>>>>>     2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
>>>>>>>> experiments)
>>>>>>>>>>     3. Played around with G1RSetSparseRegionEntries
>>>>>>>>>>
>>>>>>>>>>         This helped to avoid the POD going out of memory. But the
>>>>>>>> Old Gen
>>>>>>>>>>
>>>>>>>>>> heap issue was very evident now (Please see the attached word
>>>>>>>> document).
>>>>>>>>>>
>>>>>>>>>>   d)  Allocated additional heap memory of *700 MB *along with the
>>>>>>>> above
>>>>>>>>>>
>>>>>>>>>> changes. This also didn't help. It just prolonged the crash.  Now
>>>>>>>> I need
>>>>>>>>>> help from others to which direction I want to take this to .
>>>>>>>>>>
>>>>>>>>>> My worry is even if I upgrade to flink 1.11 this issue might 
>>>>>>>>>> still
>>>>>>>>>> persist.
>>>>>>>>>>
>>>>>>>>>> I have attached a screenshot from Heap dump to show you the
>>>>>>>> difference
>>>>>>>>>> between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
>>>>>>>> created. Not
>>>>>>>>>> sure whether this change has something to do with this memory
>>>>>>>> issue that I
>>>>>>>>>> am facing.
>>>>>>>>>> Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Josson
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
>>>>>>>> pnowojski@apache.org>
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>> Hi Josson,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for getting back.
>>>>>>>>>>>
>>>>>>>>>>> What are the JVM settings and in particular GC settings that you
>>>>>>>> are
>>>>>>>>>>> using (G1GC?)?
>>>>>>>>>>> It could also be an issue that in 1.4 you were just slightly
>>>>>>>> below the
>>>>>>>>>>> threshold of GC issues, while in 1.8, something is using a bit
>>>>>>>> more
>>>>>>>>>>> memory,
>>>>>>>>>>> causing the GC issues to appear? Have you tried just increasing
>>>>>>>> the heap
>>>>>>>>>>> size?
>>>>>>>>>>> Have you tried to compare on the job start up, what is the usage
>>>>>>>> and size
>>>>>>>>>>> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
>>>>>>>> point us in
>>>>>>>>>>> the right direction.
>>>>>>>>>>>
>>>>>>>>>>>> My understanding on back pressure is that it is not based on
>>>>>>>> Heap
>>>>>>>>>>>
>>>>>>>>>>> memory but based on how fast the Network buffers are filled. Is
>>>>>>>> this
>>>>>>>>>>> correct?.
>>>>>>>>>>>
>>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if
>>>>>>>> the tasks
>>>>>>>>>>>
>>>>>>>>>>> are in the same Task manager?.
>>>>>>>>>>>
>>>>>>>>>>> No, local input channels are being used then, but memory for
>>>>>>>> network
>>>>>>>>>>> buffers is assigned to tasks regardless of the fraction of local
>>>>>>>> input
>>>>>>>>>>> channels in the task. However with just single taskmanager and
>>>>>>>>>>> parallelism
>>>>>>>>>>> of 4, the amount of the memory used by the network stack should
>>>>>>>> be
>>>>>>>>>>> insignificant, at least as long as you have a reasonably sized
>>>>>>>> job graph
>>>>>>>>>>> (32KB * (2 * parallelism + 7) * number of tasks).
>>>>>>>>>>>
>>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>>>> Kafka if
>>>>>>>>>>>
>>>>>>>>>>> there is not sufficient heap memory to process data. Somehow
>>>>>>>> this is not
>>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>>>> get
>>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>>>> Flink 1.8.
>>>>>>>>>>>
>>>>>>>>>>> No, there were no changes in this part as far as I remember.
>>>>>>>> Tasks when
>>>>>>>>>>> producing records are serialising them and putting into the
>>>>>>>> network
>>>>>>>>>>> buffers. If there are no available network buffers, the task is
>>>>>>>> back
>>>>>>>>>>> pressuring and stops processing new records.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Piotrek
>>>>>>>>>>>
>>>>>>>>>>> wt., 8 wrz 2020 o 21:51 Josson Paul <jo...@gmail.com>
>>>>>>>> napisał(a):
>>>>>>>>>>>> Hi Piotr,
>>>>>>>>>>>>
>>>>>>>>>>>>     2) SystemProcessingTimeService holds the
>>>>>>>> HeapKeyedStateBackend and
>>>>>>>>>>>>
>>>>>>>>>>>> HeapKeyedStateBackend has lot of Objects and that is filling
>>>>>>>> the Heap
>>>>>>>>>>>>
>>>>>>>>>>>>     3) I am not using Flink Kafka Connector. But we are using
>>>>>>>> Apache Beam
>>>>>>>>>>>>
>>>>>>>>>>>> kafka connector.  There is a change in the Apache Beam version.
>>>>>>>> But the
>>>>>>>>>>>> kafka client we are using is the same as the one which was
>>>>>>>> working in
>>>>>>>>>>>> the
>>>>>>>>>>>> other cluster where  Flink was 1.4.
>>>>>>>>>>>>
>>>>>>>>>>>>    *There is no change in Hardware/Java/Kafka/Kafka
>>>>>>>> Client/Application
>>>>>>>>>>>>
>>>>>>>>>>>> between the cluster which is working and not working*
>>>>>>>>>>>>
>>>>>>>>>>>> I am aware of the memory changes and network buffer changes
>>>>>>>> between 1.4
>>>>>>>>>>>> and 1.8.
>>>>>>>>>>>>
>>>>>>>>>>>> Flink 1.4 had network buffers on Heap and 1.8 network buffers
>>>>>>>> are on the
>>>>>>>>>>>> native memory. I modified the Flink 1.8 code to put it back to
>>>>>>>> Heap
>>>>>>>>>>>> memory
>>>>>>>>>>>> but the issue didn't get resolved.
>>>>>>>>>>>>
>>>>>>>>>>>> Mine is a streaming job so we set 'taskmanager.memory.fraction'
>>>>>>>> to very
>>>>>>>>>>>> minimal and that heap is fully available for user data.
>>>>>>>>>>>>
>>>>>>>>>>>> Flink 1.4 was not using Credit based Flow control and Flink 1.8
>>>>>>>> uses
>>>>>>>>>>>> Credit based Flow control. *Our set up has only 1 task manager
>>>>>>>> and 4
>>>>>>>>>>>> parallelisms*.  According to this video
>>>>>>>>>>>>
>>>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward 
>>>>>>>> (
>>>>>>>>>>>> *16:21*) if tasks are in same task manager,  Flink doesn't use
>>>>>>>> Credit
>>>>>>>>>>>> Based Flow control. Essentially no change between Flink 1.4 and
>>>>>>>> 1.8 in
>>>>>>>>>>>> *our
>>>>>>>>>>>> set up*. Still I tried to change the Credit Based Flow Control
>>>>>>>> to False
>>>>>>>>>>>> and test my setup. The problem persists.
>>>>>>>>>>>>
>>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>>>> Kafka if
>>>>>>>>>>>> there is not sufficient heap memory to process data. Somehow
>>>>>>>> this is not
>>>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>>>> get
>>>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>>>> Flink 1.8.
>>>>>>>>>>>>
>>>>>>>>>>>> My understanding on back pressure is that it is not based on
>>>>>>>> Heap memory
>>>>>>>>>>>> but based on how fast the Network buffers are filled. Is this
>>>>>>>> correct?.
>>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if
>>>>>>>> the tasks
>>>>>>>>>>>> are in the same Task manager?.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> josson
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
>>>>>>>> pnowojski@apache.org>
>>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Josson,
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. Are you sure that all/vast majority of those objects are
>>>>>>>> pointing
>>>>>>>>>>>>> towards SystemProcessingTimeService? And is this really the
>>>>>>>> problem of
>>>>>>>>>>>>> those objects? Are they taking that much of the memory?
>>>>>>>>>>>>> 3. It still could be Kafka's problem, as it's likely that
>>>>>>>> between 1.4
>>>>>>>>>>>>> and 1.8.x we bumped Kafka dependencies.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Frankly if that's not some other external dependency issue, I
>>>>>>>> would
>>>>>>>>>>>>> expect that the problem might lie somewhere completely else.
>>>>>>>> Flink's
>>>>>>>>>>>>> code
>>>>>>>>>>>>> relaying on the finalisation hasn't changed since 2015/2016.
>>>>>>>> On the
>>>>>>>>>>>>> other
>>>>>>>>>>>>> hand there were quite a bit of changes between 1.4 and 1.8.x,
>>>>>>>> some of
>>>>>>>>>>>>> them
>>>>>>>>>>>>> were affecting memory usage. Have you read release notes for
>>>>>>>> versions
>>>>>>>>>>>>> 1.5,
>>>>>>>>>>>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
>>>>>>>> memory
>>>>>>>>>>>>> related notes that could be addressed via configuration
>>>>>>>> changes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not 
>>>>>>>>
>>>>>>>>>>>>> es/flink-1.5.html [2]
>>>>>>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not 
>>>>>>>>
>>>>>>>>>>>>> es/flink-1.8.html>>>>
>>>>>>>>>>>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jo...@gmail.com>
>>>>>>>> napisał(a):
>>>>>>>>>>>>>> 1) We are in the process of migrating to Flink 1.11. But it
>>>>>>>> is going
>>>>>>>>>>>>>> to take a while before we can make everything work with the
>>>>>>>> latest
>>>>>>>>>>>>>> version.
>>>>>>>>>>>>>> Meanwhile since this is happening in production I am trying
>>>>>>>> to solve
>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>> 2) Finalizae class is pointing
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService 
>>>>>>>>
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> This class has a finalize method. I have attached spreadsheet
>>>>>>>> (
>>>>>>>>>>>>>> *Object-explorer.csv*) to give you a high level view
>>>>>>>>>>>>>> 3) The difference between working cluster and NON working
>>>>>>>> cluster is
>>>>>>>>>>>>>> only on Beam and Flink. Hardware, Input message rate,
>>>>>>>> Application
>>>>>>>>>>>>>> jars,
>>>>>>>>>>>>>> Kafka are all the same between those 2 clusters. Working
>>>>>>>> cluster was
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> Flink 1.4 and Beam 2.4.0
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any insights into this will help me to debug further
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Josson
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
>>>>>>>> pnowojski@apache.org>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Have you tried using a more recent Flink version? 1.8.x is
>>>>>>>> no longer
>>>>>>>>>>>>>>> supported, and latest versions might not have this issue
>>>>>>>> anymore.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Secondly, have you tried backtracking those references to 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> Finalizers? Assuming that Finalizer is indeed the class
>>>>>>>> causing
>>>>>>>>>>>>>>> problems.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also it may well be a non Flink issue [1].
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jo...@gmail.com>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> napisał(a):
>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *ISSUE*
>>>>>>>>>>>>>>>> ------
>>>>>>>>>>>>>>>> Flink application runs for sometime and suddenly the CPU
>>>>>>>> shoots up
>>>>>>>>>>>>>>>> and touches the peak, POD memory reaches to the peak, GC
>>>>>>>> count
>>>>>>>>>>>>>>>> increases,
>>>>>>>>>>>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean
>>>>>>>> up heap
>>>>>>>>>>>>>>>> space. At
>>>>>>>>>>>>>>>> this point I stopped sending the data and cancelled the
>>>>>>>> Flink Jobs.
>>>>>>>>>>>>>>>> Still
>>>>>>>>>>>>>>>> the Old-Gen space doesn't come down. I took a heap dump and
>>>>>>>> can see
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> lot of Objects in the java.lang.Finalizer class. I have
>>>>>>>> attached the
>>>>>>>>>>>>>>>> details in a word document. I do have the heap dump but it
>>>>>>>> is close
>>>>>>>>>>>>>>>> to 2GB
>>>>>>>>>>>>>>>> of compressed size. Is it safe to upload somewhere and
>>>>>>>> share it
>>>>>>>>>>>>>>>> here?.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam:
>>>>>>>> release-2.4.0
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:
>>>>>>>> release-2.4.0)
>>>>>>>>>>>>>>>> ----------------------------------------------------
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Application reads from Kafka and does aggregations and
>>>>>>>> writes into
>>>>>>>>>>>>>>>> Kafka. Application has 5 minutes windows. Application uses
>>>>>>>> Beam
>>>>>>>>>>>>>>>> constructs
>>>>>>>>>>>>>>>> to build the pipeline. To read and write we use Beam
>>>>>>>> connectors.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Flink version: 1.4.0
>>>>>>>>>>>>>>>> Beam version: release-2.4.0
>>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check
>>>>>>>> pointing
>>>>>>>>>>>>>>>> happening to the distributed File System.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> No of task Managers: 1
>>>>>>>>>>>>>>>> Heap: 6.4 GB
>>>>>>>>>>>>>>>> CPU: 4 Cores
>>>>>>>>>>>>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
>>>>>>>> version:
>>>>>>>>>>>>>>>> release-2.15.0)
>>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>> Application details are same as above
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *No change in application and the rate at which data is
>>>>>>>> injected.
>>>>>>>>>>>>>>>> But change in Flink and Beam versions*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Flink version: 1.8.3
>>>>>>>>>>>>>>>> Beam version: release-2.15.0
>>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check
>>>>>>>> pointing
>>>>>>>>>>>>>>>> happening to the distributed File System.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> No of task Managers: 1
>>>>>>>>>>>>>>>> Heap: 6.5 GB
>>>>>>>>>>>>>>>> CPU: 4 Cores
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Deployment: Standalone cluster deployment on a Kubernetes
>>>>>>>> pod
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My Observations
>>>>>>>>>>>>>>>> -------------
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1) CPU flame graph shows that in the working version, the
>>>>>>>> cpu time
>>>>>>>>>>>>>>>> on GC is lesser compared to non-working version (Please see
>>>>>>>> the
>>>>>>>>>>>>>>>> attached
>>>>>>>>>>>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster 
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> *CPU-flame-NOT-working.svg*)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2) I have attached the flame graph for native memory MALLOC
>>>>>>>> calls
>>>>>>>>>>>>>>>> when the issue was happening. Please find the attached SVG
>>>>>>>> image (
>>>>>>>>>>>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this
>>>>>>>> issue
>>>>>>>>>>>>>>>> happens. For me, it looks like the GC process is requesting
>>>>>>>> a lot of
>>>>>>>>>>>>>>>> native
>>>>>>>>>>>>>>>> memory.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3) When the issue is happening the GC cpu usage is very
>>>>>>>> high. Please
>>>>>>>>>>>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note: SVG file can be opened using any browser and it is
>>>>>>>> clickable
>>>>>>>>>>>>>>>> while opened.
>>>>>>>>>>>>>>>> -- 
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Josson
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- 
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Josson
>>>>>>>>>>>>
>>>>>>>>>>>> -- 
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Josson
>>>>>>>>>>
>>>>>>>>>> -- 
>>>>>>>>>> Thanks
>>>>>>>>>> Josson
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> -- 
>>>>>>> Thanks
>>>>>>> Josson
>>>>>>>
>>>>>>
>>>>>
>>>>> -- 
>>>>> Thanks
>>>>> Josson
>>>>>
>>>>
>>>
>>> -- 
>>> Thanks
>>> Josson
>>>
>>
> 


Re: Flink 1.8.3 GC issues

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

nice work on debugging this!

We need the synchronized block in the source because the call to 
reader.advance() (via the invoker) and reader.getCurrent() (via 
emitElement()) must be atomic with respect to state. We cannot advance 
the reader state, not emit that record but still checkpoint the new 
reader state. The monitor ensures that no checkpoint can happen in 
between those to calls.

The basic problem is now that we starve checkpointing because the 
monitor/lock is not fair. This could be solved by using a fair lock but 
that would require Flink proper to be changed to use a fair lock instead 
of a monitor/synchronized. I don't see this as an immediate solution.

One thing that exacerbates this problem is that too many things are 
happening "under" the synchronized block. All the transforms before a 
shuffle/rebalance/keyBy are chained to the source, which means that they 
are invoked from the emitElement() call. You could see this by 
printing/logging a stacktrace in your user function that does the Redis 
lookups.

A possible mitigation would be to disable chaining globally by inserting 
a `flinkStreamEnv.disableOperatorChaining()` in [1].

A more surgical version would be to only disable chaining for sources. 
I'm attaching a patch for that in case you're willing to try it out. 
This is for latest master but it's easy enough to apply manually.

Best,
Aljoscha

[1] 
https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225

On 23.10.20 09:47, Piotr Nowojski wrote:
> Hi Josson,
> 
> Thanks for great investigation and coming back to use. Aljoscha, could you
> help us here? It looks like you were involved in this original BEAM-3087
> issue.
> 
> Best,
> Piotrek
> 
> pt., 23 paź 2020 o 07:36 Josson Paul <jo...@gmail.com> napisał(a):
> 
>> @Piotr Nowojski <pn...@apache.org>  @Nico Kruber <nk...@apache.org>
>>
>> An update.
>>
>> I am able to figure out the problem code. A change in the Apache Beam code
>> is causing this problem.
>>
>>
>>
>>
>>
>> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
>> the Flink’s check point lock. Now the same lock is used by Flink’s timer
>> service to emit the Watermarks. Flink’s timer service is starved to get
>> hold of the lock and for some reason it never gets that lock. Aftereffect
>>   of this situation is that the ‘WaterMark’ is never emitted by Flink’s
>> timer service.  Because there is no Watermarks flowing through the system,
>> Sliding Windows are never closed. Data gets accumulated in the Window.
>>
>>
>>
>> This problem occurs only if we have external lookup calls (like Redis)
>> happen before the data goes to Sliding Window. Something like below.
>>
>>
>>
>> KafkaSource à Transforms (Occasional Redis
>> lookup)->SlidingWindow->Transforms->Kafka Sink
>>
>>
>>
>>
>>
>>
>> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
>> . This is Beam 2.4 and you can see that there is no synchronized block at
>> line 257 and 270.
>>
>>
>>
>>
>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
>> . This is Beam 2.15. See the synchronized block introduced in line 264 and
>> 280. We are using Beam 2.15 and Flink 1.8.
>>
>>
>>
>> Beam introduced this synchronized block because of this bug.
>> https://issues.apache.org/jira/browse/BEAM-3087
>>
>>
>>
>> After I removed that synchronized keyword everything started working fine
>> in my application.
>>
>>
>>
>> What do you guys think about this?. Why does Beam need a Synchronized
>> block there?
>>
>>
>>
>> Beam is using this lock ->
>>
>>
>> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282
>>
>>
>>
>> Thanks,
>>
>> Josson
>>
>> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>
>>> Hi Josson,
>>>
>>> The TM logs that you attached are only from a 5 minutes time period. Are
>>> you sure they are encompassing the period before the potential failure and
>>> after the potential failure? It would be also nice if you would provide the
>>> logs matching to the charts (like the one you were providing in the
>>> previous messages), to correlate events (spike in latency/GC with some
>>> timestamp from the logs).
>>>
>>> I was not asking necessarily to upgrade to Java9, but an updated/bug
>>> fixed version of Java8 [1].
>>>
>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never
>>> goes out of memory whatever be the ingestion rate. our Windows are 5
>>> minutes windows.
>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>>> Full GC doesn't reclaim space.
>>>
>>> In both cases there is the same mechanism for the backpressure. If a
>>> task's output runs out of buffers to put produced records, it will block
>>> the task. It can be that between 1.4 and 1.8, with credit based flow
>>> control changes, the amount of available buffers for the tasks on your
>>> setup has grown, so the tasks are backpressuring later. This in turn can
>>> sometimes mean that at any point of time there is more data buffered on the
>>> operator's state, like `WindowOperator`. I'm not sure what's the
>>> best/easiest way how to check this:
>>>
>>> 1. the amount of buffered data might be visible via metrics [2][3]
>>> 2. if you enable DEBUG logs, it should be visible via:
>>>
>>>> LOG.debug("Using a local buffer pool with {}-{} buffers",
>>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>>>
>>> entry logged by
>>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>>>
>>> Piotrek
>>>
>>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>>>
>>> pon., 14 wrz 2020 o 05:04 Josson Paul <jo...@gmail.com> napisał(a):
>>>
>>>> @Piotr Nowojski <pn...@apache.org> @Nico Kruber <nk...@apache.org>
>>>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>>>
>>>> I don't see any issues in the TM logs.
>>>> Tried to upgrade to Java 9. Flink is on top of another platform which
>>>> threw errors while upgrading to Java 9. I can't do much for now. We will
>>>> upgrade to Jdk 11 in another 2 months.
>>>>
>>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on
>>>> both Flink 1.4 and Flink 1.8.
>>>>
>>>> Questions I am trying to get answered are
>>>>
>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
>>>> out of memory whatever be the ingestion rate. our Windows are 5
>>>> minutes windows.
>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>>>> Full GC doesn't reclaim space.
>>>>
>>>>
>>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pn...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Josson,
>>>>>
>>>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in
>>>>> non-heap memory, just about when the problems started happening. Maybe you
>>>>> could post the TM logs?
>>>>> Have you tried updating JVM to a newer version?
>>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but
>>>>> in an earlier message you said you increased it by 700MB?
>>>>>
>>>>> Piotrek
>>>>>
>>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jo...@gmail.com> napisał(a):
>>>>>
>>>>>> I have attached two word documents.
>>>>>> Flink1.4 and Flink1.8
>>>>>> I reduced the heap size in the cluster and tried the experiment in
>>>>>> both Flink 1.4 and Flink 1.8.
>>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going
>>>>>> into the details here).
>>>>>>
>>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1
>>>>>> hour. You can see the details in the attached Flink1.4 document file. You
>>>>>> can see the GC activity and Cpu. Both are holding good.
>>>>>>
>>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
>>>>>> happening. Issue started within 15 minutes of starting the ingestion. @Piotr
>>>>>> Nowojski <pn...@apache.org> , you can see that there is no meta
>>>>>> space related issue. All the GC related details are available in the doc.
>>>>>>
>>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in
>>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because
>>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem
>>>>>> is expected?.
>>>>>>
>>>>>> @Nicko, We are not doing the fat jar stuff.
>>>>>>
>>>>>> @Piotr Nowojski <pn...@apache.org> , we are in the process of
>>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months.
>>>>>>
>>>>>>
>>>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe
>>>>>> it was happening only 1 or 2 times.
>>>>>>
>>>>>> Please let me know if you need additional input
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Josson
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nk...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> What looks a bit strange to me is that with a running job, the
>>>>>>> SystemProcessingTimeService should actually not be collected (since
>>>>>>> it is
>>>>>>> still in use)!
>>>>>>>
>>>>>>> My guess is that something is indeed happening during that time frame
>>>>>>> (maybe
>>>>>>> job restarts?) and I would propose to check your logs for anything
>>>>>>> suspicious
>>>>>>> in there.
>>>>>>>
>>>>>>>
>>>>>>> When I did experiments with Beam pipelines on our platform [1], I
>>>>>>> also
>>>>>>> noticed, that the standard fat jars that Beam creates include Flink
>>>>>>> runtime
>>>>>>> classes it shouldn't (at least if you are submitting to a separate
>>>>>>> Flink
>>>>>>> cluster). This can cause all sorts of problems and I would recommend
>>>>>>> removing
>>>>>>> those from the fat jar as documented in [1].
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Nico
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099
>>>>>>>
>>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
>>>>>>>> Hi Josson,
>>>>>>>>
>>>>>>>> Thanks again for the detailed answer, and sorry that I can not help
>>>>>>> you
>>>>>>>> with some immediate answer. I presume that jvm args for 1.8 are the
>>>>>>> same?
>>>>>>>>
>>>>>>>> Can you maybe post what exactly has crashed in your cases a) and b)?
>>>>>>>> Re c), in the previously attached word document, it looks like
>>>>>>> Flink was
>>>>>>>> running without problems for a couple of hours/minutes, everything
>>>>>>> was
>>>>>>>> stable, no signs of growing memory consumption, impending problem,
>>>>>>> until
>>>>>>>> around 23:15, when the problem started, right? Has something else
>>>>>>> happened
>>>>>>>> at that time, something that could explain the spike? A checkpoint?
>>>>>>> Job
>>>>>>>> crash/restart? Load spike?
>>>>>>>>
>>>>>>>> A couple of other random guesses:
>>>>>>>> - have you been monitoring other memory pools for Flink 1.4 and
>>>>>>> 1.8? Like
>>>>>>>> meta space? Growing meta space size can sometimes cause problems. It
>>>>>>>> shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
>>>>>>> but it
>>>>>>>> might be still worth checking...
>>>>>>>> - another random idea, have you tried upgrading JDK? Maybe that
>>>>>>> would solve
>>>>>>>> the problem?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>> śr., 9 wrz 2020 o 19:53 Josson Paul <jo...@gmail.com>
>>>>>>> napisał(a):
>>>>>>>>> Hi Piotr,
>>>>>>>>>
>>>>>>>>>   *JVM start up for Flink 1.4*
>>>>>>>>>
>>>>>>>>> *-------------------------------*
>>>>>>>>>
>>>>>>>>>
>>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
>>>>>>>>> skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
>>>>>>>>> *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
>>>>>>>>> -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
>>>>>>>>>
>>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
>>>>>>>>> -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
>>>>>>>>> %p*-Dio.netty.eventLoopThreads=3*
>>>>>>>>>
>>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
>>>>>>>>>
>>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
>>>>>>>>> orkaddress.cache.ttl=120-Dnum.cores=3-
>>>>>>>>>
>>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
>>>>>>>>> allelism=3-XX:ConcGCThreads=4 *
>>>>>>>>>
>>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
>>>>>>>>>
>>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
>>>>>>>>>
>>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
>>>>>>>>>
>>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
>>>>>>>>>
>>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
>>>>>>>>> aglev.MaglevServerstartmaglev>
>>>>>>>>>     1.   taskmanager.memory.fraction = 0.7f (This was coming to
>>>>>>> 4.5 GB. I
>>>>>>>>>     didn't know at that time that we could set memory fraction to
>>>>>>> zero
>>>>>>>>>     because
>>>>>>>>>     ours is a streaming job. It was  picking up the default )
>>>>>>>>>     2.    Network buffer pool memory was 646MB on the Heap (I
>>>>>>> think this
>>>>>>>>>     was the default based on some calculations in the Flink 1.4)
>>>>>>>>>     3.    G1GC region size was 4MB (Default)
>>>>>>>>>
>>>>>>>>> I tested this setup by reducing the JVM heap by *1GB.* It still
>>>>>>> worked
>>>>>>>>> perfectly with some lags here and there.
>>>>>>>>>
>>>>>>>>> *JVM start up for Flink 1.8*
>>>>>>>>> *------------------------------------*
>>>>>>>>> a) I started with the same configuration as above. Kubenetis POD
>>>>>>> went out
>>>>>>>>> of memory. At this point I realized that in Flink 1.8  network
>>>>>>> buffer
>>>>>>>>> pools
>>>>>>>>> are moved to native memory. Based on calculations it was coming
>>>>>>> to 200MB
>>>>>>>>> in
>>>>>>>>> native  memory. I increased the overall POD memory to accommodate
>>>>>>> the
>>>>>>>>> buffer pool change keeping the *heap the same*.
>>>>>>>>>
>>>>>>>>> b) Even after I modified the overall POD memory,  the POD still
>>>>>>> crashed.
>>>>>>>>> At this point I generated Flame graphs to identify the CPU/Malloc
>>>>>>> calls
>>>>>>>>> (Attached as part of the initial email). Realized that cpu usage
>>>>>>> of G1GC
>>>>>>>>> is
>>>>>>>>> significantly different from Flink 1.4. Now I made 2 changes
>>>>>>>>>
>>>>>>>>>     1.  taskmanager.memory.fraction = 0.01f (This will give more
>>>>>>> heap for
>>>>>>>>>     user code)
>>>>>>>>>     2. Increased cpu from 3 to 4 cores.
>>>>>>>>>
>>>>>>>>>          Above changes helped to hold the cluster a little longer.
>>>>>>> But it
>>>>>>>>>
>>>>>>>>> still crashed after sometime.
>>>>>>>>>
>>>>>>>>> c)  Now I made the below changes.
>>>>>>>>>
>>>>>>>>>     1. I came across this ->
>>>>>>>>>
>>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
>>>>>>>>>     622.html . Now I changed the G1GC region space to *8MB
>>>>>>> *instead of the
>>>>>>>>>     default 4MB*.*
>>>>>>>>>     2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
>>>>>>> experiments)
>>>>>>>>>     3. Played around with G1RSetSparseRegionEntries
>>>>>>>>>
>>>>>>>>>         This helped to avoid the POD going out of memory. But the
>>>>>>> Old Gen
>>>>>>>>>
>>>>>>>>> heap issue was very evident now (Please see the attached word
>>>>>>> document).
>>>>>>>>>
>>>>>>>>>   d)  Allocated additional heap memory of *700 MB *along with the
>>>>>>> above
>>>>>>>>>
>>>>>>>>> changes. This also didn't help. It just prolonged the crash.  Now
>>>>>>> I need
>>>>>>>>> help from others to which direction I want to take this to .
>>>>>>>>>
>>>>>>>>> My worry is even if I upgrade to flink 1.11 this issue might still
>>>>>>>>> persist.
>>>>>>>>>
>>>>>>>>> I have attached a screenshot from Heap dump to show you the
>>>>>>> difference
>>>>>>>>> between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
>>>>>>> created. Not
>>>>>>>>> sure whether this change has something to do with this memory
>>>>>>> issue that I
>>>>>>>>> am facing.
>>>>>>>>> Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Josson
>>>>>>>>>
>>>>>>>>> On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
>>>>>>> pnowojski@apache.org>
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>> Hi Josson,
>>>>>>>>>>
>>>>>>>>>> Thanks for getting back.
>>>>>>>>>>
>>>>>>>>>> What are the JVM settings and in particular GC settings that you
>>>>>>> are
>>>>>>>>>> using (G1GC?)?
>>>>>>>>>> It could also be an issue that in 1.4 you were just slightly
>>>>>>> below the
>>>>>>>>>> threshold of GC issues, while in 1.8, something is using a bit
>>>>>>> more
>>>>>>>>>> memory,
>>>>>>>>>> causing the GC issues to appear? Have you tried just increasing
>>>>>>> the heap
>>>>>>>>>> size?
>>>>>>>>>> Have you tried to compare on the job start up, what is the usage
>>>>>>> and size
>>>>>>>>>> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
>>>>>>> point us in
>>>>>>>>>> the right direction.
>>>>>>>>>>
>>>>>>>>>>> My understanding on back pressure is that it is not based on
>>>>>>> Heap
>>>>>>>>>>
>>>>>>>>>> memory but based on how fast the Network buffers are filled. Is
>>>>>>> this
>>>>>>>>>> correct?.
>>>>>>>>>>
>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if
>>>>>>> the tasks
>>>>>>>>>>
>>>>>>>>>> are in the same Task manager?.
>>>>>>>>>>
>>>>>>>>>> No, local input channels are being used then, but memory for
>>>>>>> network
>>>>>>>>>> buffers is assigned to tasks regardless of the fraction of local
>>>>>>> input
>>>>>>>>>> channels in the task. However with just single taskmanager and
>>>>>>>>>> parallelism
>>>>>>>>>> of 4, the amount of the memory used by the network stack should
>>>>>>> be
>>>>>>>>>> insignificant, at least as long as you have a reasonably sized
>>>>>>> job graph
>>>>>>>>>> (32KB * (2 * parallelism + 7) * number of tasks).
>>>>>>>>>>
>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>>> Kafka if
>>>>>>>>>>
>>>>>>>>>> there is not sufficient heap memory to process data. Somehow
>>>>>>> this is not
>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>>> get
>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>>> Flink 1.8.
>>>>>>>>>>
>>>>>>>>>> No, there were no changes in this part as far as I remember.
>>>>>>> Tasks when
>>>>>>>>>> producing records are serialising them and putting into the
>>>>>>> network
>>>>>>>>>> buffers. If there are no available network buffers, the task is
>>>>>>> back
>>>>>>>>>> pressuring and stops processing new records.
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotrek
>>>>>>>>>>
>>>>>>>>>> wt., 8 wrz 2020 o 21:51 Josson Paul <jo...@gmail.com>
>>>>>>> napisał(a):
>>>>>>>>>>> Hi Piotr,
>>>>>>>>>>>
>>>>>>>>>>>     2) SystemProcessingTimeService holds the
>>>>>>> HeapKeyedStateBackend and
>>>>>>>>>>>
>>>>>>>>>>> HeapKeyedStateBackend has lot of Objects and that is filling
>>>>>>> the Heap
>>>>>>>>>>>
>>>>>>>>>>>     3) I am not using Flink Kafka Connector. But we are using
>>>>>>> Apache Beam
>>>>>>>>>>>
>>>>>>>>>>> kafka connector.  There is a change in the Apache Beam version.
>>>>>>> But the
>>>>>>>>>>> kafka client we are using is the same as the one which was
>>>>>>> working in
>>>>>>>>>>> the
>>>>>>>>>>> other cluster where  Flink was 1.4.
>>>>>>>>>>>
>>>>>>>>>>>    *There is no change in Hardware/Java/Kafka/Kafka
>>>>>>> Client/Application
>>>>>>>>>>>
>>>>>>>>>>> between the cluster which is working and not working*
>>>>>>>>>>>
>>>>>>>>>>> I am aware of the memory changes and network buffer changes
>>>>>>> between 1.4
>>>>>>>>>>> and 1.8.
>>>>>>>>>>>
>>>>>>>>>>> Flink 1.4 had network buffers on Heap and 1.8 network buffers
>>>>>>> are on the
>>>>>>>>>>> native memory. I modified the Flink 1.8 code to put it back to
>>>>>>> Heap
>>>>>>>>>>> memory
>>>>>>>>>>> but the issue didn't get resolved.
>>>>>>>>>>>
>>>>>>>>>>> Mine is a streaming job so we set 'taskmanager.memory.fraction'
>>>>>>> to very
>>>>>>>>>>> minimal and that heap is fully available for user data.
>>>>>>>>>>>
>>>>>>>>>>> Flink 1.4 was not using Credit based Flow control and Flink 1.8
>>>>>>> uses
>>>>>>>>>>> Credit based Flow control. *Our set up has only 1 task manager
>>>>>>> and 4
>>>>>>>>>>> parallelisms*.  According to this video
>>>>>>>>>>>
>>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
>>>>>>>>>>> *16:21*) if tasks are in same task manager,  Flink doesn't use
>>>>>>> Credit
>>>>>>>>>>> Based Flow control. Essentially no change between Flink 1.4 and
>>>>>>> 1.8 in
>>>>>>>>>>> *our
>>>>>>>>>>> set up*. Still I tried to change the Credit Based Flow Control
>>>>>>> to False
>>>>>>>>>>> and test my setup. The problem persists.
>>>>>>>>>>>
>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>>> Kafka if
>>>>>>>>>>> there is not sufficient heap memory to process data. Somehow
>>>>>>> this is not
>>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>>> get
>>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>>> Flink 1.8.
>>>>>>>>>>>
>>>>>>>>>>> My understanding on back pressure is that it is not based on
>>>>>>> Heap memory
>>>>>>>>>>> but based on how fast the Network buffers are filled. Is this
>>>>>>> correct?.
>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if
>>>>>>> the tasks
>>>>>>>>>>> are in the same Task manager?.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> josson
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
>>>>>>> pnowojski@apache.org>
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hi Josson,
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Are you sure that all/vast majority of those objects are
>>>>>>> pointing
>>>>>>>>>>>> towards SystemProcessingTimeService? And is this really the
>>>>>>> problem of
>>>>>>>>>>>> those objects? Are they taking that much of the memory?
>>>>>>>>>>>> 3. It still could be Kafka's problem, as it's likely that
>>>>>>> between 1.4
>>>>>>>>>>>> and 1.8.x we bumped Kafka dependencies.
>>>>>>>>>>>>
>>>>>>>>>>>> Frankly if that's not some other external dependency issue, I
>>>>>>> would
>>>>>>>>>>>> expect that the problem might lie somewhere completely else.
>>>>>>> Flink's
>>>>>>>>>>>> code
>>>>>>>>>>>> relaying on the finalisation hasn't changed since 2015/2016.
>>>>>>> On the
>>>>>>>>>>>> other
>>>>>>>>>>>> hand there were quite a bit of changes between 1.4 and 1.8.x,
>>>>>>> some of
>>>>>>>>>>>> them
>>>>>>>>>>>> were affecting memory usage. Have you read release notes for
>>>>>>> versions
>>>>>>>>>>>> 1.5,
>>>>>>>>>>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
>>>>>>> memory
>>>>>>>>>>>> related notes that could be addressed via configuration
>>>>>>> changes.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
>>>>>>>>>>>> es/flink-1.5.html [2]
>>>>>>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
>>>>>>>>>>>> es/flink-1.8.html>>>>
>>>>>>>>>>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jo...@gmail.com>
>>>>>>> napisał(a):
>>>>>>>>>>>>> 1) We are in the process of migrating to Flink 1.11. But it
>>>>>>> is going
>>>>>>>>>>>>> to take a while before we can make everything work with the
>>>>>>> latest
>>>>>>>>>>>>> version.
>>>>>>>>>>>>> Meanwhile since this is happening in production I am trying
>>>>>>> to solve
>>>>>>>>>>>>> this.
>>>>>>>>>>>>> 2) Finalizae class is pointing
>>>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
>>>>>>>>>>>>> .
>>>>>>>>>>>>> This class has a finalize method. I have attached spreadsheet
>>>>>>> (
>>>>>>>>>>>>> *Object-explorer.csv*) to give you a high level view
>>>>>>>>>>>>> 3) The difference between working cluster and NON working
>>>>>>> cluster is
>>>>>>>>>>>>> only on Beam and Flink. Hardware, Input message rate,
>>>>>>> Application
>>>>>>>>>>>>> jars,
>>>>>>>>>>>>> Kafka are all the same between those 2 clusters. Working
>>>>>>> cluster was
>>>>>>>>>>>>> with
>>>>>>>>>>>>> Flink 1.4 and Beam 2.4.0
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any insights into this will help me to debug further
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Josson
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
>>>>>>> pnowojski@apache.org>
>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have you tried using a more recent Flink version? 1.8.x is
>>>>>>> no longer
>>>>>>>>>>>>>> supported, and latest versions might not have this issue
>>>>>>> anymore.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Secondly, have you tried backtracking those references to the
>>>>>>>>>>>>>> Finalizers? Assuming that Finalizer is indeed the class
>>>>>>> causing
>>>>>>>>>>>>>> problems.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also it may well be a non Flink issue [1].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jo...@gmail.com>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> napisał(a):
>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *ISSUE*
>>>>>>>>>>>>>>> ------
>>>>>>>>>>>>>>> Flink application runs for sometime and suddenly the CPU
>>>>>>> shoots up
>>>>>>>>>>>>>>> and touches the peak, POD memory reaches to the peak, GC
>>>>>>> count
>>>>>>>>>>>>>>> increases,
>>>>>>>>>>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean
>>>>>>> up heap
>>>>>>>>>>>>>>> space. At
>>>>>>>>>>>>>>> this point I stopped sending the data and cancelled the
>>>>>>> Flink Jobs.
>>>>>>>>>>>>>>> Still
>>>>>>>>>>>>>>> the Old-Gen space doesn't come down. I took a heap dump and
>>>>>>> can see
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> lot of Objects in the java.lang.Finalizer class. I have
>>>>>>> attached the
>>>>>>>>>>>>>>> details in a word document. I do have the heap dump but it
>>>>>>> is close
>>>>>>>>>>>>>>> to 2GB
>>>>>>>>>>>>>>> of compressed size. Is it safe to upload somewhere and
>>>>>>> share it
>>>>>>>>>>>>>>> here?.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam:
>>>>>>> release-2.4.0
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:
>>>>>>> release-2.4.0)
>>>>>>>>>>>>>>> ----------------------------------------------------
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Application reads from Kafka and does aggregations and
>>>>>>> writes into
>>>>>>>>>>>>>>> Kafka. Application has 5 minutes windows. Application uses
>>>>>>> Beam
>>>>>>>>>>>>>>> constructs
>>>>>>>>>>>>>>> to build the pipeline. To read and write we use Beam
>>>>>>> connectors.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Flink version: 1.4.0
>>>>>>>>>>>>>>> Beam version: release-2.4.0
>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check
>>>>>>> pointing
>>>>>>>>>>>>>>> happening to the distributed File System.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> No of task Managers: 1
>>>>>>>>>>>>>>> Heap: 6.4 GB
>>>>>>>>>>>>>>> CPU: 4 Cores
>>>>>>>>>>>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
>>>>>>> version:
>>>>>>>>>>>>>>> release-2.15.0)
>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>> Application details are same as above
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *No change in application and the rate at which data is
>>>>>>> injected.
>>>>>>>>>>>>>>> But change in Flink and Beam versions*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Flink version: 1.8.3
>>>>>>>>>>>>>>> Beam version: release-2.15.0
>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check
>>>>>>> pointing
>>>>>>>>>>>>>>> happening to the distributed File System.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> No of task Managers: 1
>>>>>>>>>>>>>>> Heap: 6.5 GB
>>>>>>>>>>>>>>> CPU: 4 Cores
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Deployment: Standalone cluster deployment on a Kubernetes
>>>>>>> pod
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My Observations
>>>>>>>>>>>>>>> -------------
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) CPU flame graph shows that in the working version, the
>>>>>>> cpu time
>>>>>>>>>>>>>>> on GC is lesser compared to non-working version (Please see
>>>>>>> the
>>>>>>>>>>>>>>> attached
>>>>>>>>>>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
>>>>>>>>>>>>>>> *CPU-flame-NOT-working.svg*)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) I have attached the flame graph for native memory MALLOC
>>>>>>> calls
>>>>>>>>>>>>>>> when the issue was happening. Please find the attached SVG
>>>>>>> image (
>>>>>>>>>>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this
>>>>>>> issue
>>>>>>>>>>>>>>> happens. For me, it looks like the GC process is requesting
>>>>>>> a lot of
>>>>>>>>>>>>>>> native
>>>>>>>>>>>>>>> memory.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3) When the issue is happening the GC cpu usage is very
>>>>>>> high. Please
>>>>>>>>>>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note: SVG file can be opened using any browser and it is
>>>>>>> clickable
>>>>>>>>>>>>>>> while opened.
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Josson
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Josson
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks
>>>>>>>>>>> Josson
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks
>>>>>>>>> Josson
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Josson
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Josson
>>>>
>>>
>>
>> --
>> Thanks
>> Josson
>>
> 


Re: Flink 1.8.3 GC issues

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could you
help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul <jo...@gmail.com> napisał(a):

> @Piotr Nowojski <pn...@apache.org>  @Nico Kruber <nk...@apache.org>
>
> An update.
>
> I am able to figure out the problem code. A change in the Apache Beam code
> is causing this problem.
>
>
>
>
>
> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
> the Flink’s check point lock. Now the same lock is used by Flink’s timer
> service to emit the Watermarks. Flink’s timer service is starved to get
> hold of the lock and for some reason it never gets that lock. Aftereffect
>  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
> timer service.  Because there is no Watermarks flowing through the system,
> Sliding Windows are never closed. Data gets accumulated in the Window.
>
>
>
> This problem occurs only if we have external lookup calls (like Redis)
> happen before the data goes to Sliding Window. Something like below.
>
>
>
> KafkaSource à Transforms (Occasional Redis
> lookup)->SlidingWindow->Transforms->Kafka Sink
>
>
>
>
>
>
> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
> . This is Beam 2.4 and you can see that there is no synchronized block at
> line 257 and 270.
>
>
>
>
> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
> . This is Beam 2.15. See the synchronized block introduced in line 264 and
> 280. We are using Beam 2.15 and Flink 1.8.
>
>
>
> Beam introduced this synchronized block because of this bug.
> https://issues.apache.org/jira/browse/BEAM-3087
>
>
>
> After I removed that synchronized keyword everything started working fine
> in my application.
>
>
>
> What do you guys think about this?. Why does Beam need a Synchronized
> block there?
>
>
>
> Beam is using this lock ->
>
>
> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282
>
>
>
> Thanks,
>
> Josson
>
> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi Josson,
>>
>> The TM logs that you attached are only from a 5 minutes time period. Are
>> you sure they are encompassing the period before the potential failure and
>> after the potential failure? It would be also nice if you would provide the
>> logs matching to the charts (like the one you were providing in the
>> previous messages), to correlate events (spike in latency/GC with some
>> timestamp from the logs).
>>
>> I was not asking necessarily to upgrade to Java9, but an updated/bug
>> fixed version of Java8 [1].
>>
>> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never
>> goes out of memory whatever be the ingestion rate. our Windows are 5
>> minutes windows.
>> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>> Full GC doesn't reclaim space.
>>
>> In both cases there is the same mechanism for the backpressure. If a
>> task's output runs out of buffers to put produced records, it will block
>> the task. It can be that between 1.4 and 1.8, with credit based flow
>> control changes, the amount of available buffers for the tasks on your
>> setup has grown, so the tasks are backpressuring later. This in turn can
>> sometimes mean that at any point of time there is more data buffered on the
>> operator's state, like `WindowOperator`. I'm not sure what's the
>> best/easiest way how to check this:
>>
>> 1. the amount of buffered data might be visible via metrics [2][3]
>> 2. if you enable DEBUG logs, it should be visible via:
>>
>> > LOG.debug("Using a local buffer pool with {}-{} buffers",
>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>>
>> entry logged by
>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>>
>> Piotrek
>>
>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>>
>> pon., 14 wrz 2020 o 05:04 Josson Paul <jo...@gmail.com> napisał(a):
>>
>>> @Piotr Nowojski <pn...@apache.org> @Nico Kruber <nk...@apache.org>
>>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>>
>>> I don't see any issues in the TM logs.
>>> Tried to upgrade to Java 9. Flink is on top of another platform which
>>> threw errors while upgrading to Java 9. I can't do much for now. We will
>>> upgrade to Jdk 11 in another 2 months.
>>>
>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on
>>> both Flink 1.4 and Flink 1.8.
>>>
>>> Questions I am trying to get answered are
>>>
>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
>>> out of memory whatever be the ingestion rate. our Windows are 5
>>> minutes windows.
>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>>> Full GC doesn't reclaim space.
>>>
>>>
>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>>> Hi Josson,
>>>>
>>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in
>>>> non-heap memory, just about when the problems started happening. Maybe you
>>>> could post the TM logs?
>>>> Have you tried updating JVM to a newer version?
>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but
>>>> in an earlier message you said you increased it by 700MB?
>>>>
>>>> Piotrek
>>>>
>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jo...@gmail.com> napisał(a):
>>>>
>>>>> I have attached two word documents.
>>>>> Flink1.4 and Flink1.8
>>>>> I reduced the heap size in the cluster and tried the experiment in
>>>>> both Flink 1.4 and Flink 1.8.
>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going
>>>>> into the details here).
>>>>>
>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1
>>>>> hour. You can see the details in the attached Flink1.4 document file. You
>>>>> can see the GC activity and Cpu. Both are holding good.
>>>>>
>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
>>>>> happening. Issue started within 15 minutes of starting the ingestion. @Piotr
>>>>> Nowojski <pn...@apache.org> , you can see that there is no meta
>>>>> space related issue. All the GC related details are available in the doc.
>>>>>
>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in
>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because
>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem
>>>>> is expected?.
>>>>>
>>>>> @Nicko, We are not doing the fat jar stuff.
>>>>>
>>>>> @Piotr Nowojski <pn...@apache.org> , we are in the process of
>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months.
>>>>>
>>>>>
>>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe
>>>>> it was happening only 1 or 2 times.
>>>>>
>>>>> Please let me know if you need additional input
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Josson
>>>>>
>>>>>
>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nk...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> What looks a bit strange to me is that with a running job, the
>>>>>> SystemProcessingTimeService should actually not be collected (since
>>>>>> it is
>>>>>> still in use)!
>>>>>>
>>>>>> My guess is that something is indeed happening during that time frame
>>>>>> (maybe
>>>>>> job restarts?) and I would propose to check your logs for anything
>>>>>> suspicious
>>>>>> in there.
>>>>>>
>>>>>>
>>>>>> When I did experiments with Beam pipelines on our platform [1], I
>>>>>> also
>>>>>> noticed, that the standard fat jars that Beam creates include Flink
>>>>>> runtime
>>>>>> classes it shouldn't (at least if you are submitting to a separate
>>>>>> Flink
>>>>>> cluster). This can cause all sorts of problems and I would recommend
>>>>>> removing
>>>>>> those from the fat jar as documented in [1].
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Nico
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099
>>>>>>
>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
>>>>>> > Hi Josson,
>>>>>> >
>>>>>> > Thanks again for the detailed answer, and sorry that I can not help
>>>>>> you
>>>>>> > with some immediate answer. I presume that jvm args for 1.8 are the
>>>>>> same?
>>>>>> >
>>>>>> > Can you maybe post what exactly has crashed in your cases a) and b)?
>>>>>> > Re c), in the previously attached word document, it looks like
>>>>>> Flink was
>>>>>> > running without problems for a couple of hours/minutes, everything
>>>>>> was
>>>>>> > stable, no signs of growing memory consumption, impending problem,
>>>>>> until
>>>>>> > around 23:15, when the problem started, right? Has something else
>>>>>> happened
>>>>>> > at that time, something that could explain the spike? A checkpoint?
>>>>>> Job
>>>>>> > crash/restart? Load spike?
>>>>>> >
>>>>>> > A couple of other random guesses:
>>>>>> > - have you been monitoring other memory pools for Flink 1.4 and
>>>>>> 1.8? Like
>>>>>> > meta space? Growing meta space size can sometimes cause problems. It
>>>>>> > shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
>>>>>> but it
>>>>>> > might be still worth checking...
>>>>>> > - another random idea, have you tried upgrading JDK? Maybe that
>>>>>> would solve
>>>>>> > the problem?
>>>>>> >
>>>>>> > Best regards,
>>>>>> > Piotrek
>>>>>> >
>>>>>> > śr., 9 wrz 2020 o 19:53 Josson Paul <jo...@gmail.com>
>>>>>> napisał(a):
>>>>>> > > Hi Piotr,
>>>>>> > >
>>>>>> > >  *JVM start up for Flink 1.4*
>>>>>> > >
>>>>>> > > *-------------------------------*
>>>>>> > >
>>>>>> > >
>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
>>>>>> > > skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
>>>>>> > > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
>>>>>> > > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
>>>>>> > >
>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
>>>>>> > > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
>>>>>> > > %p*-Dio.netty.eventLoopThreads=3*
>>>>>> > >
>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
>>>>>> > >
>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
>>>>>> > > orkaddress.cache.ttl=120-Dnum.cores=3-
>>>>>> > >
>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
>>>>>> > > allelism=3-XX:ConcGCThreads=4 *
>>>>>> > >
>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
>>>>>> > >
>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
>>>>>> > >
>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
>>>>>> > >
>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
>>>>>> > >
>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
>>>>>> > > aglev.MaglevServerstartmaglev>
>>>>>> > >    1.   taskmanager.memory.fraction = 0.7f (This was coming to
>>>>>> 4.5 GB. I
>>>>>> > >    didn't know at that time that we could set memory fraction to
>>>>>> zero
>>>>>> > >    because
>>>>>> > >    ours is a streaming job. It was  picking up the default )
>>>>>> > >    2.    Network buffer pool memory was 646MB on the Heap (I
>>>>>> think this
>>>>>> > >    was the default based on some calculations in the Flink 1.4)
>>>>>> > >    3.    G1GC region size was 4MB (Default)
>>>>>> > >
>>>>>> > > I tested this setup by reducing the JVM heap by *1GB.* It still
>>>>>> worked
>>>>>> > > perfectly with some lags here and there.
>>>>>> > >
>>>>>> > > *JVM start up for Flink 1.8*
>>>>>> > > *------------------------------------*
>>>>>> > > a) I started with the same configuration as above. Kubenetis POD
>>>>>> went out
>>>>>> > > of memory. At this point I realized that in Flink 1.8  network
>>>>>> buffer
>>>>>> > > pools
>>>>>> > > are moved to native memory. Based on calculations it was coming
>>>>>> to 200MB
>>>>>> > > in
>>>>>> > > native  memory. I increased the overall POD memory to accommodate
>>>>>> the
>>>>>> > > buffer pool change keeping the *heap the same*.
>>>>>> > >
>>>>>> > > b) Even after I modified the overall POD memory,  the POD still
>>>>>> crashed.
>>>>>> > > At this point I generated Flame graphs to identify the CPU/Malloc
>>>>>> calls
>>>>>> > > (Attached as part of the initial email). Realized that cpu usage
>>>>>> of G1GC
>>>>>> > > is
>>>>>> > > significantly different from Flink 1.4. Now I made 2 changes
>>>>>> > >
>>>>>> > >    1.  taskmanager.memory.fraction = 0.01f (This will give more
>>>>>> heap for
>>>>>> > >    user code)
>>>>>> > >    2. Increased cpu from 3 to 4 cores.
>>>>>> > >
>>>>>> > >         Above changes helped to hold the cluster a little longer.
>>>>>> But it
>>>>>> > >
>>>>>> > > still crashed after sometime.
>>>>>> > >
>>>>>> > > c)  Now I made the below changes.
>>>>>> > >
>>>>>> > >    1. I came across this ->
>>>>>> > >
>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
>>>>>> > >    622.html . Now I changed the G1GC region space to *8MB
>>>>>> *instead of the
>>>>>> > >    default 4MB*.*
>>>>>> > >    2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
>>>>>> experiments)
>>>>>> > >    3. Played around with G1RSetSparseRegionEntries
>>>>>> > >
>>>>>> > >        This helped to avoid the POD going out of memory. But the
>>>>>> Old Gen
>>>>>> > >
>>>>>> > > heap issue was very evident now (Please see the attached word
>>>>>> document).
>>>>>> > >
>>>>>> > >  d)  Allocated additional heap memory of *700 MB *along with the
>>>>>> above
>>>>>> > >
>>>>>> > > changes. This also didn't help. It just prolonged the crash.  Now
>>>>>> I need
>>>>>> > > help from others to which direction I want to take this to .
>>>>>> > >
>>>>>> > > My worry is even if I upgrade to flink 1.11 this issue might still
>>>>>> > > persist.
>>>>>> > >
>>>>>> > > I have attached a screenshot from Heap dump to show you the
>>>>>> difference
>>>>>> > > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
>>>>>> created. Not
>>>>>> > > sure whether this change has something to do with this memory
>>>>>> issue that I
>>>>>> > > am facing.
>>>>>> > > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
>>>>>> > >
>>>>>> > >
>>>>>> > > Thanks,
>>>>>> > > Josson
>>>>>> > >
>>>>>> > > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
>>>>>> pnowojski@apache.org>
>>>>>> > >
>>>>>> > > wrote:
>>>>>> > >> Hi Josson,
>>>>>> > >>
>>>>>> > >> Thanks for getting back.
>>>>>> > >>
>>>>>> > >> What are the JVM settings and in particular GC settings that you
>>>>>> are
>>>>>> > >> using (G1GC?)?
>>>>>> > >> It could also be an issue that in 1.4 you were just slightly
>>>>>> below the
>>>>>> > >> threshold of GC issues, while in 1.8, something is using a bit
>>>>>> more
>>>>>> > >> memory,
>>>>>> > >> causing the GC issues to appear? Have you tried just increasing
>>>>>> the heap
>>>>>> > >> size?
>>>>>> > >> Have you tried to compare on the job start up, what is the usage
>>>>>> and size
>>>>>> > >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
>>>>>> point us in
>>>>>> > >> the right direction.
>>>>>> > >>
>>>>>> > >> > My understanding on back pressure is that it is not based on
>>>>>> Heap
>>>>>> > >>
>>>>>> > >> memory but based on how fast the Network buffers are filled. Is
>>>>>> this
>>>>>> > >> correct?.
>>>>>> > >>
>>>>>> > >> > Does Flink use TCP connection to communicate between tasks if
>>>>>> the tasks
>>>>>> > >>
>>>>>> > >> are in the same Task manager?.
>>>>>> > >>
>>>>>> > >> No, local input channels are being used then, but memory for
>>>>>> network
>>>>>> > >> buffers is assigned to tasks regardless of the fraction of local
>>>>>> input
>>>>>> > >> channels in the task. However with just single taskmanager and
>>>>>> > >> parallelism
>>>>>> > >> of 4, the amount of the memory used by the network stack should
>>>>>> be
>>>>>> > >> insignificant, at least as long as you have a reasonably sized
>>>>>> job graph
>>>>>> > >> (32KB * (2 * parallelism + 7) * number of tasks).
>>>>>> > >>
>>>>>> > >> > What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>> Kafka if
>>>>>> > >>
>>>>>> > >> there is not sufficient heap memory to process data. Somehow
>>>>>> this is not
>>>>>> > >> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>> get
>>>>>> > >> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>> Flink 1.8.
>>>>>> > >>
>>>>>> > >> No, there were no changes in this part as far as I remember.
>>>>>> Tasks when
>>>>>> > >> producing records are serialising them and putting into the
>>>>>> network
>>>>>> > >> buffers. If there are no available network buffers, the task is
>>>>>> back
>>>>>> > >> pressuring and stops processing new records.
>>>>>> > >>
>>>>>> > >> Best regards,
>>>>>> > >> Piotrek
>>>>>> > >>
>>>>>> > >> wt., 8 wrz 2020 o 21:51 Josson Paul <jo...@gmail.com>
>>>>>> napisał(a):
>>>>>> > >>> Hi Piotr,
>>>>>> > >>>
>>>>>> > >>>    2) SystemProcessingTimeService holds the
>>>>>> HeapKeyedStateBackend and
>>>>>> > >>>
>>>>>> > >>> HeapKeyedStateBackend has lot of Objects and that is filling
>>>>>> the Heap
>>>>>> > >>>
>>>>>> > >>>    3) I am not using Flink Kafka Connector. But we are using
>>>>>> Apache Beam
>>>>>> > >>>
>>>>>> > >>> kafka connector.  There is a change in the Apache Beam version.
>>>>>> But the
>>>>>> > >>> kafka client we are using is the same as the one which was
>>>>>> working in
>>>>>> > >>> the
>>>>>> > >>> other cluster where  Flink was 1.4.
>>>>>> > >>>
>>>>>> > >>>   *There is no change in Hardware/Java/Kafka/Kafka
>>>>>> Client/Application
>>>>>> > >>>
>>>>>> > >>> between the cluster which is working and not working*
>>>>>> > >>>
>>>>>> > >>> I am aware of the memory changes and network buffer changes
>>>>>> between 1.4
>>>>>> > >>> and 1.8.
>>>>>> > >>>
>>>>>> > >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers
>>>>>> are on the
>>>>>> > >>> native memory. I modified the Flink 1.8 code to put it back to
>>>>>> Heap
>>>>>> > >>> memory
>>>>>> > >>> but the issue didn't get resolved.
>>>>>> > >>>
>>>>>> > >>> Mine is a streaming job so we set 'taskmanager.memory.fraction'
>>>>>> to very
>>>>>> > >>> minimal and that heap is fully available for user data.
>>>>>> > >>>
>>>>>> > >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8
>>>>>> uses
>>>>>> > >>> Credit based Flow control. *Our set up has only 1 task manager
>>>>>> and 4
>>>>>> > >>> parallelisms*.  According to this video
>>>>>> > >>>
>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
>>>>>> > >>> *16:21*) if tasks are in same task manager,  Flink doesn't use
>>>>>> Credit
>>>>>> > >>> Based Flow control. Essentially no change between Flink 1.4 and
>>>>>> 1.8 in
>>>>>> > >>> *our
>>>>>> > >>> set up*. Still I tried to change the Credit Based Flow Control
>>>>>> to False
>>>>>> > >>> and test my setup. The problem persists.
>>>>>> > >>>
>>>>>> > >>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>> Kafka if
>>>>>> > >>> there is not sufficient heap memory to process data. Somehow
>>>>>> this is not
>>>>>> > >>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>> get
>>>>>> > >>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>> Flink 1.8.
>>>>>> > >>>
>>>>>> > >>> My understanding on back pressure is that it is not based on
>>>>>> Heap memory
>>>>>> > >>> but based on how fast the Network buffers are filled. Is this
>>>>>> correct?.
>>>>>> > >>> Does Flink use TCP connection to communicate between tasks if
>>>>>> the tasks
>>>>>> > >>> are in the same Task manager?.
>>>>>> > >>>
>>>>>> > >>> Thanks,
>>>>>> > >>> josson
>>>>>> > >>>
>>>>>> > >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
>>>>>> pnowojski@apache.org>
>>>>>> > >>>
>>>>>> > >>> wrote:
>>>>>> > >>>> Hi Josson,
>>>>>> > >>>>
>>>>>> > >>>> 2. Are you sure that all/vast majority of those objects are
>>>>>> pointing
>>>>>> > >>>> towards SystemProcessingTimeService? And is this really the
>>>>>> problem of
>>>>>> > >>>> those objects? Are they taking that much of the memory?
>>>>>> > >>>> 3. It still could be Kafka's problem, as it's likely that
>>>>>> between 1.4
>>>>>> > >>>> and 1.8.x we bumped Kafka dependencies.
>>>>>> > >>>>
>>>>>> > >>>> Frankly if that's not some other external dependency issue, I
>>>>>> would
>>>>>> > >>>> expect that the problem might lie somewhere completely else.
>>>>>> Flink's
>>>>>> > >>>> code
>>>>>> > >>>> relaying on the finalisation hasn't changed since 2015/2016.
>>>>>> On the
>>>>>> > >>>> other
>>>>>> > >>>> hand there were quite a bit of changes between 1.4 and 1.8.x,
>>>>>> some of
>>>>>> > >>>> them
>>>>>> > >>>> were affecting memory usage. Have you read release notes for
>>>>>> versions
>>>>>> > >>>> 1.5,
>>>>>> > >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
>>>>>> memory
>>>>>> > >>>> related notes that could be addressed via configuration
>>>>>> changes.
>>>>>> > >>>>
>>>>>> > >>>> Thanks,
>>>>>> > >>>> Piotrek
>>>>>> > >>>>
>>>>>> > >>>> [1]
>>>>>> > >>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
>>>>>> > >>>> es/flink-1.5.html [2]
>>>>>> > >>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
>>>>>> > >>>> es/flink-1.8.html>>>>
>>>>>> > >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jo...@gmail.com>
>>>>>> napisał(a):
>>>>>> > >>>>> 1) We are in the process of migrating to Flink 1.11. But it
>>>>>> is going
>>>>>> > >>>>> to take a while before we can make everything work with the
>>>>>> latest
>>>>>> > >>>>> version.
>>>>>> > >>>>> Meanwhile since this is happening in production I am trying
>>>>>> to solve
>>>>>> > >>>>> this.
>>>>>> > >>>>> 2) Finalizae class is pointing
>>>>>> > >>>>> to
>>>>>> > >>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
>>>>>> > >>>>> .
>>>>>> > >>>>> This class has a finalize method. I have attached spreadsheet
>>>>>> (
>>>>>> > >>>>> *Object-explorer.csv*) to give you a high level view
>>>>>> > >>>>> 3) The difference between working cluster and NON working
>>>>>> cluster is
>>>>>> > >>>>> only on Beam and Flink. Hardware, Input message rate,
>>>>>> Application
>>>>>> > >>>>> jars,
>>>>>> > >>>>> Kafka are all the same between those 2 clusters. Working
>>>>>> cluster was
>>>>>> > >>>>> with
>>>>>> > >>>>> Flink 1.4 and Beam 2.4.0
>>>>>> > >>>>>
>>>>>> > >>>>> Any insights into this will help me to debug further
>>>>>> > >>>>>
>>>>>> > >>>>> Thanks,
>>>>>> > >>>>> Josson
>>>>>> > >>>>>
>>>>>> > >>>>>
>>>>>> > >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
>>>>>> pnowojski@apache.org>
>>>>>> > >>>>>
>>>>>> > >>>>> wrote:
>>>>>> > >>>>>> Hi,
>>>>>> > >>>>>>
>>>>>> > >>>>>> Have you tried using a more recent Flink version? 1.8.x is
>>>>>> no longer
>>>>>> > >>>>>> supported, and latest versions might not have this issue
>>>>>> anymore.
>>>>>> > >>>>>>
>>>>>> > >>>>>> Secondly, have you tried backtracking those references to the
>>>>>> > >>>>>> Finalizers? Assuming that Finalizer is indeed the class
>>>>>> causing
>>>>>> > >>>>>> problems.
>>>>>> > >>>>>>
>>>>>> > >>>>>> Also it may well be a non Flink issue [1].
>>>>>> > >>>>>>
>>>>>> > >>>>>> Best regards,
>>>>>> > >>>>>> Piotrek
>>>>>> > >>>>>>
>>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>>> > >>>>>>
>>>>>> > >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jo...@gmail.com>
>>>>>> > >>>>>>
>>>>>> > >>>>>> napisał(a):
>>>>>> > >>>>>>> Hi All,
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *ISSUE*
>>>>>> > >>>>>>> ------
>>>>>> > >>>>>>> Flink application runs for sometime and suddenly the CPU
>>>>>> shoots up
>>>>>> > >>>>>>> and touches the peak, POD memory reaches to the peak, GC
>>>>>> count
>>>>>> > >>>>>>> increases,
>>>>>> > >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean
>>>>>> up heap
>>>>>> > >>>>>>> space. At
>>>>>> > >>>>>>> this point I stopped sending the data and cancelled the
>>>>>> Flink Jobs.
>>>>>> > >>>>>>> Still
>>>>>> > >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and
>>>>>> can see
>>>>>> > >>>>>>> that
>>>>>> > >>>>>>> lot of Objects in the java.lang.Finalizer class. I have
>>>>>> attached the
>>>>>> > >>>>>>> details in a word document. I do have the heap dump but it
>>>>>> is close
>>>>>> > >>>>>>> to 2GB
>>>>>> > >>>>>>> of compressed size. Is it safe to upload somewhere and
>>>>>> share it
>>>>>> > >>>>>>> here?.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam:
>>>>>> release-2.4.0
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:
>>>>>> release-2.4.0)
>>>>>> > >>>>>>> ----------------------------------------------------
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Application reads from Kafka and does aggregations and
>>>>>> writes into
>>>>>> > >>>>>>> Kafka. Application has 5 minutes windows. Application uses
>>>>>> Beam
>>>>>> > >>>>>>> constructs
>>>>>> > >>>>>>> to build the pipeline. To read and write we use Beam
>>>>>> connectors.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Flink version: 1.4.0
>>>>>> > >>>>>>> Beam version: release-2.4.0
>>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>>> pointing
>>>>>> > >>>>>>> happening to the distributed File System.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> No of task Managers: 1
>>>>>> > >>>>>>> Heap: 6.4 GB
>>>>>> > >>>>>>> CPU: 4 Cores
>>>>>> > >>>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
>>>>>> version:
>>>>>> > >>>>>>> release-2.15.0)
>>>>>> > >>>>>>> ----------
>>>>>> > >>>>>>> Application details are same as above
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *No change in application and the rate at which data is
>>>>>> injected.
>>>>>> > >>>>>>> But change in Flink and Beam versions*
>>>>>> > >>>>>>>
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Flink version: 1.8.3
>>>>>> > >>>>>>> Beam version: release-2.15.0
>>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>>> pointing
>>>>>> > >>>>>>> happening to the distributed File System.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> No of task Managers: 1
>>>>>> > >>>>>>> Heap: 6.5 GB
>>>>>> > >>>>>>> CPU: 4 Cores
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes
>>>>>> pod
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> My Observations
>>>>>> > >>>>>>> -------------
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 1) CPU flame graph shows that in the working version, the
>>>>>> cpu time
>>>>>> > >>>>>>> on GC is lesser compared to non-working version (Please see
>>>>>> the
>>>>>> > >>>>>>> attached
>>>>>> > >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
>>>>>> > >>>>>>> *CPU-flame-NOT-working.svg*)
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 2) I have attached the flame graph for native memory MALLOC
>>>>>> calls
>>>>>> > >>>>>>> when the issue was happening. Please find the attached SVG
>>>>>> image (
>>>>>> > >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this
>>>>>> issue
>>>>>> > >>>>>>> happens. For me, it looks like the GC process is requesting
>>>>>> a lot of
>>>>>> > >>>>>>> native
>>>>>> > >>>>>>> memory.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 3) When the issue is happening the GC cpu usage is very
>>>>>> high. Please
>>>>>> > >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Note: SVG file can be opened using any browser and it is
>>>>>> clickable
>>>>>> > >>>>>>> while opened.
>>>>>> > >>>>>>> --
>>>>>> > >>>>>>> Thanks
>>>>>> > >>>>>>> Josson
>>>>>> > >>>>>
>>>>>> > >>>>> --
>>>>>> > >>>>> Thanks
>>>>>> > >>>>> Josson
>>>>>> > >>>
>>>>>> > >>> --
>>>>>> > >>> Thanks
>>>>>> > >>> Josson
>>>>>> > >
>>>>>> > > --
>>>>>> > > Thanks
>>>>>> > > Josson
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Josson
>>>>>
>>>>
>>>
>>> --
>>> Thanks
>>> Josson
>>>
>>
>
> --
> Thanks
> Josson
>