You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Lin Zhao <li...@exabeam.com> on 2016/01/15 01:06:16 UTC

Spark Streaming: custom actor receiver losing vast majority of data

Hi,

I'm testing spark streaming with actor receiver. The actor keeps calling store() to save a pair to Spark.

Once the job is launched, on the UI everything looks good. Millions of events gets through every batch. However, I added logging to the first step and found that only 20 or 40 events in a batch actually gets to the first mapper. Any idea what might be causing this?

I also have log in the custom receiver before "store()" call and it's really calling this function millions of times.

The receiver definition looks like:


val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever",
  mrSections.head, conf, flowControlDef, None, None), "Martini",
  StorageLevel.MEMORY_ONLY_SER)


The job looks like:

stream.map { pair =>
    logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch
    pair._2
}.flatMap { m =>
  // Event Builder
  logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}")
  ebHelper(m)
}.map { e =>
  // Event Normalizer
  logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
  DefaultEventNormalizer.normalizeFields(e)
}.map { e =>
  logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
  resolver(e)
}.flatMap { e =>
  // Event Discarder
  logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
  discarder(e)
}.map { e =>
  ep(e)
}

Re: Spark Streaming: custom actor receiver losing vast majority of data

Posted by Lin Zhao <li...@exabeam.com>.
Hi Shixiong,

Just figured it out. I was doing a .print() as output operation, which seems to stop the batch once it has 10 through. I changed it to a no-op foreachRDD and it works.

Thanks for jumping in to help me.

From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:41 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

Could you post the codes of MessageRetriever? And by the way, could you post the screenshot of tasks for a batch and check the input size of these tasks? Considering there are so many events, there should be a lot of blocks as well as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao <li...@exabeam.com>> wrote:
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2


Re: Spark Streaming: custom actor receiver losing vast majority of data

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you post the codes of MessageRetriever? And by the way, could you
post the screenshot of tasks for a batch and check the input size of these
tasks? Considering there are so many events, there should be a lot of
blocks as well as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao <li...@exabeam.com> wrote:

> Hi Shixiong,
>
> I tried this but it still happens. If it helps, it's 1.6.0 and runs on
> YARN. Batch duration is 20 seconds.
>
> Some logs seemingly related to block manager:
>
> 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
> 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
> 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
> 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
> 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
> 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
>
>
> From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>
> Date: Thursday, January 14, 2016 at 4:13 PM
> To: Lin Zhao <li...@exabeam.com>
> Cc: user <us...@spark.apache.org>
> Subject: Re: Spark Streaming: custom actor receiver losing vast majority
> of data
>
> MEMORY_AND_DISK_SER_2
>

Re: Spark Streaming: custom actor receiver losing vast majority of data

Posted by Lin Zhao <li...@exabeam.com>.
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" <sh...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao <li...@exabeam.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2

Re: Spark Streaming: custom actor receiver losing vast majority of data

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.

On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao <li...@exabeam.com> wrote:

> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever",
>   mrSections.head, conf, flowControlDef, None, None), "Martini",
>   StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
>     logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch
>     pair._2
> }.flatMap { m =>
>   // Event Builder
>   logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}")
>   ebHelper(m)
> }.map { e =>
>   // Event Normalizer
>   logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
>   DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
>   logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
>   resolver(e)
> }.flatMap { e =>
>   // Event Discarder
>   logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}")
>   discarder(e)
> }.map { e =>
>   ep(e)
> }
>
>