You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2017/09/04 09:47:12 UTC

Re: Streaming job gets slower and slower

Hi Aparup,

the slow-down can have multiple reasons. One reason could be that your
computation in Timeseries-Analytics becomes more complex over time and
therefore it slows down resulting in back pressure at the sources. This
could be, for example, caused by accumulating a large state. Here the
question would be which state backend are you using if your computation is
stateful?

Another problem could be garbage collection which only kicks in after some
time. You can get information about the current garbage collection by
enabling the configuration settings described here [1]. This could for
example indicate that you create a lot of objects and maybe this could be
optimized. Moreover it could indicate that you keep the reference to some
old objects which cannot be garbage collected. In order to debug this, you
could also take a look at a heap dump of the running program when it's slow.

Another question would be whether you could upgrade to Flink 1.3.2 to see
whether the problems exist there as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#memory-and-performance-debugging

Cheers,
Till

On Sun, Aug 27, 2017 at 9:03 AM, Aparup Banerjee (apbanerj) <
apbanerj@cisco.com> wrote:

> We have a flink job with 7 subtasks. See graph below. This is on flink
> 1.2.
>
>
>
>
>
> Here each source task consumes from a kafka topic. Data rate is low around
> 70-80 messages per sec. What we are noticing is after running for 2 hours
> or so the source tasks starts showing up back pressure. A thread dump shows
> the following – A bunch of blocked threads like this . Any idea what could
> be going here?
>
>
>
> "OutputFlusher" #87 daemon prio=5 os_prio=0 tid=0x0000000000b3b000
> nid=0x73 waiting for monitor entry [0x00007f1d03af9000]
>
>    java.lang.Thread.State: BLOCKED (on object monitor)
>
>         at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.flush(RecordWriter.java:175)
>
>         - waiting to lock <0x000000060076fa80> (a
> org.apache.flink.runtime.io.network.api.serialization.
> SpanningRecordSerializer)
>
>         at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
>
>
> Waiting on
>
>
>
> "Time Trigger for Source:stream://com.tesseract.com/snmp_generic/envtemp
> -> FlatMap -> Map (1/1)" #92 daemon prio=5 os_prio=0 tid=0x0000000000c13000
> nid=0x79 in Object.wait() [0x00007f1d032e5000]
>
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>
>         at java.lang.Object.wait(Native Method)
>
>         at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
>
>         - locked <0x00000006063b5aa8> (a java.util.ArrayDeque)
>
>         at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
>
>         at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
>
>         - locked <0x000000060076fa80> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
>
>         at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.randomEmit(RecordWriter.java:106)
>
>         at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> randomEmit(StreamRecordWriter.java:104)
>
>         at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> emitLatencyMarker(RecordWriterOutput.java:96)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.emitLatencyMarker(
> AbstractStreamOperator.java:821)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.reportOrForwardLatencyMarker(
> AbstractStreamOperator.java:681)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:
> 663)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.emitLatencyMarker(OperatorChain.java:389)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.emitLatencyMarker(
> AbstractStreamOperator.java:821)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.reportOrForwardLatencyMarker(
> AbstractStreamOperator.java:681)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:
> 663)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.emitLatencyMarker(OperatorChain.java:389)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.emitLatencyMarker(
> AbstractStreamOperator.java:821)
>
>         at org.apache.flink.streaming.api.operators.StreamSource$
> LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142)
>
>         at org.apache.flink.streaming.runtime.tasks.
> SystemProcessingTimeService$RepeatedTriggerTask.run(
> SystemProcessingTimeService.java:256)
>
>         - locked <0x00000006038fc128> (a java.lang.Object)
>
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> "Source:stream://com.tesseract.com/metricstream/v1 -> FlatMap -> Map
> (1/1)" #76 daemon prio=5 os_prio=0 tid=0x00007f1d240b5000 nid=0x68 in
> Object.wait() [0x00007f1d08b4e000]
>
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>
>         at java.lang.Object.wait(Native Method)
>
>         at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
>
>         - locked <0x0000000602c8daa8> (a java.util.ArrayDeque)
>
>         at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
>
>         at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
>
>         - locked <0x0000000606572e98> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
>
>         at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.emit(RecordWriter.java:88)
>
>         at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
>
>         at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
>
>         at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 827)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 805)
>
>         at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:38)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 827)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 805)
>
>         at org.apache.flink.streaming.api.operators.
> TimestampedCollector.collect(TimestampedCollector.java:51)
>
>         at com.cisco.ndp.cep.impl.flinksiddhi.source.
> SourceToTupleFunction.flatMap(SourceToTupleFunction.java:142)
>
>         at com.cisco.ndp.cep.impl.flinksiddhi.source.
> SourceToTupleFunction.flatMap(SourceToTupleFunction.java:40)
>
>         at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
>
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 827)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 805)
>
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:84)
>
>         - locked <0x000000060387f028> (a java.lang.Object)
>
>         at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecord(AbstractFetcher.java:245)
>
>         - locked <0x000000060387f028> (a java.lang.Object)
>
>         at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:198)
>
>         at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>
>         at org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>
>
>
>
>
>
>