You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Lin Zhao <li...@exabeam.com> on 2016/01/15 01:06:16 UTC
Spark Streaming: custom actor receiver losing vast majority of data
Hi,
I'm testing spark streaming with actor receiver. The actor keeps calling store() to save a pair to Spark.
Once the job is launched, on the UI everything looks good. Millions of events gets through every batch. However, I added logging to the first step and found that only 20 or 40 events in a batch actually gets to the first mapper. Any idea what might be causing this?
I also have log in the custom receiver before "store()" call and it's really calling this function millions of times.
The receiver definition looks like:
val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever",
mrSections.head, conf, flowControlDef, None, None), "Martini",
StorageLevel.MEMORY_ONLY_SER)
The job looks like:
stream.map { pair =>
logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch
pair._2
}.flatMap { m =>
// Event Builder
logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}")
ebHelper(m)
}.map { e =>
// Event Normalizer
logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
DefaultEventNormalizer.normalizeFields(e)
}.map { e =>
logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
resolver(e)
}.flatMap { e =>
// Event Discarder
logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
discarder(e)
}.map { e =>
ep(e)
}
Re: Spark Streaming: custom actor receiver losing vast majority of
data
Posted by Lin Zhao <li...@exabeam.com>.
Hi Shixiong,
Just figured it out. I was doing a .print() as output operation, which seems to stop the batch once it has 10 through. I changed it to a no-op foreachRDD and it works.
Thanks for jumping in to help me.
From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:41 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data
Could you post the codes of MessageRetriever? And by the way, could you post the screenshot of tasks for a batch and check the input size of these tasks? Considering there are so many events, there should be a lot of blocks as well as a lot of tasks.
On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao <li...@exabeam.com>> wrote:
Hi Shixiong,
I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds.
Some logs seemingly related to block manager:
16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data
MEMORY_AND_DISK_SER_2
Re: Spark Streaming: custom actor receiver losing vast majority of data
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you post the codes of MessageRetriever? And by the way, could you
post the screenshot of tasks for a batch and check the input size of these
tasks? Considering there are so many events, there should be a lot of
blocks as well as a lot of tasks.
On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao <li...@exabeam.com> wrote:
> Hi Shixiong,
>
> I tried this but it still happens. If it helps, it's 1.6.0 and runs on
> YARN. Batch duration is 20 seconds.
>
> Some logs seemingly related to block manager:
>
> 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
> 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
> 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
> 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
> 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
> 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
>
>
> From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>
> Date: Thursday, January 14, 2016 at 4:13 PM
> To: Lin Zhao <li...@exabeam.com>
> Cc: user <us...@spark.apache.org>
> Subject: Re: Spark Streaming: custom actor receiver losing vast majority
> of data
>
> MEMORY_AND_DISK_SER_2
>
Re: Spark Streaming: custom actor receiver losing vast majority of
data
Posted by Lin Zhao <li...@exabeam.com>.
Hi Shixiong,
I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds.
Some logs seemingly related to block manager:
16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data
MEMORY_AND_DISK_SER_2
Re: Spark Streaming: custom actor receiver losing vast majority of data
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.
On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao <li...@exabeam.com> wrote:
> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever",
> mrSections.head, conf, flowControlDef, None, None), "Martini",
> StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
> logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch
> pair._2
> }.flatMap { m =>
> // Event Builder
> logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}")
> ebHelper(m)
> }.map { e =>
> // Event Normalizer
> logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
> DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
> logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
> resolver(e)
> }.flatMap { e =>
> // Event Discarder
> logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
> discarder(e)
> }.map { e =>
> ep(e)
> }
>
>