You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Udo Fholl <ud...@gmail.com> on 2016/02/03 18:52:50 UTC

Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Hi all,

I recently migrated from 'updateStateByKey' to 'mapWithState' and now I see
a huge increase of memory. Most of it is a massive "BlockGenerator" (which
points to a massive "ArrayBlockingQueue" that in turns point to a huge
"Object[]").

I'm pretty sure it has to do with my code, but I barely changed anything in
the code. Just adapted the function.

Did anyone run into this?

Best regards,
Udo.

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Posted by Udo Fholl <ud...@gmail.com>.
Sorry I realized that I left a bit of the last email.

This is the only BLOCKED thread in the dump. Refence handler is blocked
most likely due to the GC running at the moment of the dump.

"Reference Handler" daemon prio=10 tid=2 BLOCKED
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)


On Fri, Feb 5, 2016 at 10:44 AM, Udo Fholl <ud...@gmail.com> wrote:

> It does not look like. Here is the output of "grep -A2 -i waiting
> spark_tdump.log"
>
> "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "task-result-getter-1" daemon prio=5 tid=101 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at
> com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
> --
> "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
> prio=5 tid=193 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RecurringTimer - Kinesis Checkpointer - Worker
> localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
> TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
> --
> "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "Executor task launch worker-0" daemon prio=5 tid=84 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "pool-28-thread-1" prio=5 tid=92 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
> prio=5 tid=185 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at
> scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
> --
> "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> --
> "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "SparkListenerBus" daemon prio=5 tid=18 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179
> TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "Executor task launch worker-1" daemon prio=5 tid=99 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "block-manager-ask-thread-pool-4" daemon prio=5 tid=180 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "pool-28-thread-2" prio=5 tid=93 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "MAP_OUTPUT_TRACKER cleanup timer" daemon prio=5 tid=45 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "ForkJoinPool-3-worker-5" daemon prio=5 tid=190 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "java-sdk-http-connection-reaper" daemon prio=5 tid=75 TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at
> com.amazonaws.http.IdleConnectionReaper.run(IdleConnectionReaper.java:112)
> --
> "pool-25-thread-1" prio=5 tid=96 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "StreamingListenerBus" daemon prio=5 tid=71 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "JMX server connection timeout 152" daemon prio=5 tid=152 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at
> com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
> --
> "wal-batching-thread-pool-0" daemon prio=5 tid=137 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "block-manager-slave-async-thread-pool-7" daemon prio=5 tid=181
> TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "process reaper" daemon prio=10 tid=97 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "block-manager-ask-thread-pool-5" daemon prio=5 tid=183 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "netty-rpc-env-timeout" daemon prio=5 tid=60 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "block-manager-slave-async-thread-pool-8" daemon prio=5 tid=182
> TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "RefreshMaterialTask" daemon prio=5 tid=74 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.util.TimerThread.mainLoop(Timer.java:552)
> --
> "Kinesis Receiver 0" daemon prio=5 tid=91 TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:347)
> --
> "RecurringTimer - BlockGenerator" daemon prio=5 tid=85 TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
> --
> "driver-heartbeater" daemon prio=5 tid=63 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "sparkDriverActorSystem-scheduler-1" daemon prio=5 tid=32 TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
> --
> "sparkDriverActorSystem-akka.actor.default-dispatcher-4" daemon prio=5
> tid=35 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "task-result-getter-2" daemon prio=5 tid=159 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "streaming-job-executor-0" daemon prio=5 tid=95 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "dag-scheduler-event-loop" daemon prio=5 tid=62 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "task-result-getter-3" daemon prio=5 tid=161 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "Finalizer" daemon prio=8 tid=3 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> --
> "Thread-19" daemon prio=5 tid=86 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "BatchedWriteAheadLog Writer" daemon prio=5 tid=81 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "heartbeat-receiver-event-loop-thread" daemon prio=5 tid=59 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "RecurringTimer - JobGenerator" daemon prio=5 tid=70 TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
> --
> "ForkJoinPool-3-worker-7" daemon prio=5 tid=194 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "task-result-getter-0" daemon prio=5 tid=100 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "main" prio=5 tid=1 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "JobScheduler" daemon prio=5 tid=80 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RecurringTimer - BlockGenerator" daemon prio=5 tid=87 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "dispatcher-event-loop-1" daemon prio=5 tid=20 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "JobGenerator" daemon prio=5 tid=82 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "refresh progress" daemon prio=5 tid=49 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.util.TimerThread.mainLoop(Timer.java:552)
> --
> "SPARK_CONTEXT cleanup timer" daemon prio=5 tid=48 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "Timer-0" daemon prio=5 tid=61 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>
>
> On Fri, Feb 5, 2016 at 1:31 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> I guess it may be some dead-lock in BlockGenerator. Could you check it
>> by yourself?
>>
>> On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl <ud...@gmail.com> wrote:
>>
>>> Thank you for your response
>>>
>>> Unfortunately I cannot share  a thread dump. What are you looking for
>>> exactly?
>>>
>>> Here is the list of the 50 biggest objects (retained size order,
>>> descendent):
>>>
>>> java.util.concurrent.ArrayBlockingQueue#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> scala.concurrent.forkjoin.ForkJoinPool#
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.storage.MemoryStore#
>>> java.util.LinkedHashMap#
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>>> scala.concurrent.forkjoin.ForkJoinTask[]#
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>>> scala.concurrent.forkjoin.ForkJoinTask[]#
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>>> scala.concurrent.forkjoin.ForkJoinTask[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>>> scala.collection.mutable.ArrayBuffer#
>>> java.lang.Object[]#
>>> scala.collection.Iterator$$anon$
>>> org.apache.spark.InterruptibleIterator#
>>> scala.collection.IndexedSeqLike$Elements#
>>> scala.collection.mutable.ArrayOps$ofRef#
>>> java.lang.Object[]#
>>>
>>>
>>>
>>>
>>> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Hey Udo,
>>>>
>>>> mapWithState usually uses much more memory than updateStateByKey since
>>>> it caches the states in memory.
>>>>
>>>> However, from your description, looks BlockGenerator cannot push data
>>>> into BlockManager, there may be something wrong in BlockGenerator. Could
>>>> you share the top 50 objects in the heap dump and the thread dump?
>>>>
>>>>
>>>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <ud...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now
>>>>> I see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>>>> huge "Object[]").
>>>>>
>>>>> I'm pretty sure it has to do with my code, but I barely changed
>>>>> anything in the code. Just adapted the function.
>>>>>
>>>>> Did anyone run into this?
>>>>>
>>>>> Best regards,
>>>>> Udo.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Posted by Udo Fholl <ud...@gmail.com>.
It does not look like. Here is the output of "grep -A2 -i waiting
spark_tdump.log"

"RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"task-result-getter-1" daemon prio=5 tid=101 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
--
"qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
prio=5 tid=193 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - Kinesis Checkpointer - Worker
localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-0" daemon prio=5 tid=84 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-1" prio=5 tid=92 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
prio=5 tid=185 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at
scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
--
"Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"SparkListenerBus" daemon prio=5 tid=18 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-1" daemon prio=5 tid=99 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"block-manager-ask-thread-pool-4" daemon prio=5 tid=180 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-2" prio=5 tid=93 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"MAP_OUTPUT_TRACKER cleanup timer" daemon prio=5 tid=45 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"ForkJoinPool-3-worker-5" daemon prio=5 tid=190 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"java-sdk-http-connection-reaper" daemon prio=5 tid=75 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at
com.amazonaws.http.IdleConnectionReaper.run(IdleConnectionReaper.java:112)
--
"pool-25-thread-1" prio=5 tid=96 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"StreamingListenerBus" daemon prio=5 tid=71 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JMX server connection timeout 152" daemon prio=5 tid=152 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
--
"wal-batching-thread-pool-0" daemon prio=5 tid=137 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-slave-async-thread-pool-7" daemon prio=5 tid=181
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"process reaper" daemon prio=10 tid=97 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-ask-thread-pool-5" daemon prio=5 tid=183 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"netty-rpc-env-timeout" daemon prio=5 tid=60 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-slave-async-thread-pool-8" daemon prio=5 tid=182
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RefreshMaterialTask" daemon prio=5 tid=74 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.util.TimerThread.mainLoop(Timer.java:552)
--
"Kinesis Receiver 0" daemon prio=5 tid=91 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:347)
--
"RecurringTimer - BlockGenerator" daemon prio=5 tid=85 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"driver-heartbeater" daemon prio=5 tid=63 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-scheduler-1" daemon prio=5 tid=32 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
--
"sparkDriverActorSystem-akka.actor.default-dispatcher-4" daemon prio=5
tid=35 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"task-result-getter-2" daemon prio=5 tid=159 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"streaming-job-executor-0" daemon prio=5 tid=95 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"dag-scheduler-event-loop" daemon prio=5 tid=62 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"task-result-getter-3" daemon prio=5 tid=161 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"Finalizer" daemon prio=8 tid=3 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"Thread-19" daemon prio=5 tid=86 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BatchedWriteAheadLog Writer" daemon prio=5 tid=81 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"heartbeat-receiver-event-loop-thread" daemon prio=5 tid=59 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RecurringTimer - JobGenerator" daemon prio=5 tid=70 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"ForkJoinPool-3-worker-7" daemon prio=5 tid=194 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"task-result-getter-0" daemon prio=5 tid=100 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"main" prio=5 tid=1 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JobScheduler" daemon prio=5 tid=80 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - BlockGenerator" daemon prio=5 tid=87 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"dispatcher-event-loop-1" daemon prio=5 tid=20 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JobGenerator" daemon prio=5 tid=82 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"refresh progress" daemon prio=5 tid=49 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.util.TimerThread.mainLoop(Timer.java:552)
--
"SPARK_CONTEXT cleanup timer" daemon prio=5 tid=48 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"Timer-0" daemon prio=5 tid=61 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)


On Fri, Feb 5, 2016 at 1:31 AM, Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> I guess it may be some dead-lock in BlockGenerator. Could you check it by
> yourself?
>
> On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl <ud...@gmail.com> wrote:
>
>> Thank you for your response
>>
>> Unfortunately I cannot share  a thread dump. What are you looking for
>> exactly?
>>
>> Here is the list of the 50 biggest objects (retained size order,
>> descendent):
>>
>> java.util.concurrent.ArrayBlockingQueue#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> scala.concurrent.forkjoin.ForkJoinPool#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.storage.MemoryStore#
>> java.util.LinkedHashMap#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> scala.collection.Iterator$$anon$
>> org.apache.spark.InterruptibleIterator#
>> scala.collection.IndexedSeqLike$Elements#
>> scala.collection.mutable.ArrayOps$ofRef#
>> java.lang.Object[]#
>>
>>
>>
>>
>> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Hey Udo,
>>>
>>> mapWithState usually uses much more memory than updateStateByKey since
>>> it caches the states in memory.
>>>
>>> However, from your description, looks BlockGenerator cannot push data
>>> into BlockManager, there may be something wrong in BlockGenerator. Could
>>> you share the top 50 objects in the heap dump and the thread dump?
>>>
>>>
>>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <ud...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>>> huge "Object[]").
>>>>
>>>> I'm pretty sure it has to do with my code, but I barely changed
>>>> anything in the code. Just adapted the function.
>>>>
>>>> Did anyone run into this?
>>>>
>>>> Best regards,
>>>> Udo.
>>>>
>>>
>>>
>>
>

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
I guess it may be some dead-lock in BlockGenerator. Could you check it by
yourself?

On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl <ud...@gmail.com> wrote:

> Thank you for your response
>
> Unfortunately I cannot share  a thread dump. What are you looking for
> exactly?
>
> Here is the list of the 50 biggest objects (retained size order,
> descendent):
>
> java.util.concurrent.ArrayBlockingQueue#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.concurrent.forkjoin.ForkJoinPool#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.storage.MemoryStore#
> java.util.LinkedHashMap#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.collection.Iterator$$anon$
> org.apache.spark.InterruptibleIterator#
> scala.collection.IndexedSeqLike$Elements#
> scala.collection.mutable.ArrayOps$ofRef#
> java.lang.Object[]#
>
>
>
>
> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Hey Udo,
>>
>> mapWithState usually uses much more memory than updateStateByKey since it
>> caches the states in memory.
>>
>> However, from your description, looks BlockGenerator cannot push data
>> into BlockManager, there may be something wrong in BlockGenerator. Could
>> you share the top 50 objects in the heap dump and the thread dump?
>>
>>
>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <ud...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>> huge "Object[]").
>>>
>>> I'm pretty sure it has to do with my code, but I barely changed anything
>>> in the code. Just adapted the function.
>>>
>>> Did anyone run into this?
>>>
>>> Best regards,
>>> Udo.
>>>
>>
>>
>

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Posted by Udo Fholl <ud...@gmail.com>.
Thank you for your response

Unfortunately I cannot share  a thread dump. What are you looking for
exactly?

Here is the list of the 50 biggest objects (retained size order,
descendent):

java.util.concurrent.ArrayBlockingQueue#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.concurrent.forkjoin.ForkJoinPool#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.storage.MemoryStore#
java.util.LinkedHashMap#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.collection.Iterator$$anon$
org.apache.spark.InterruptibleIterator#
scala.collection.IndexedSeqLike$Elements#
scala.collection.mutable.ArrayOps$ofRef#
java.lang.Object[]#



On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Hey Udo,
>
> mapWithState usually uses much more memory than updateStateByKey since it
> caches the states in memory.
>
> However, from your description, looks BlockGenerator cannot push data into
> BlockManager, there may be something wrong in BlockGenerator. Could you
> share the top 50 objects in the heap dump and the thread dump?
>
>
> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <ud...@gmail.com> wrote:
>
>> Hi all,
>>
>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>> huge "Object[]").
>>
>> I'm pretty sure it has to do with my code, but I barely changed anything
>> in the code. Just adapted the function.
>>
>> Did anyone run into this?
>>
>> Best regards,
>> Udo.
>>
>
>

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey Udo,

mapWithState usually uses much more memory than updateStateByKey since it
caches the states in memory.

However, from your description, looks BlockGenerator cannot push data into
BlockManager, there may be something wrong in BlockGenerator. Could you
share the top 50 objects in the heap dump and the thread dump?


On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <ud...@gmail.com> wrote:

> Hi all,
>
> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
> see a huge increase of memory. Most of it is a massive "BlockGenerator"
> (which points to a massive "ArrayBlockingQueue" that in turns point to a
> huge "Object[]").
>
> I'm pretty sure it has to do with my code, but I barely changed anything
> in the code. Just adapted the function.
>
> Did anyone run into this?
>
> Best regards,
> Udo.
>