You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/10/20 08:33:58 UTC

[jira] [Resolved] (SPARK-17888) Memory leak in streaming driver when use SparkSQL in Streaming

     [ https://issues.apache.org/jira/browse/SPARK-17888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-17888.
-------------------------------
    Resolution: Not A Problem

> Memory leak in streaming driver when use SparkSQL in Streaming
> --------------------------------------------------------------
>
>                 Key: SPARK-17888
>                 URL: https://issues.apache.org/jira/browse/SPARK-17888
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.2
>         Environment: scala 2.10.4
> java 1.7.0_71
>            Reporter: weilin.chen
>              Labels: leak, memory
>
> Hi
>   I have a little program of spark 1.5, it receive data from a publisher in spark streaming. It will process these received data with spark sql. But when the time goes by I found the memory leak in driver, so i update to spark 1.6.2. But, there is no change in the situation.
> here is the code:
> {quote}
>  val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, "subresult"))
> val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
> val logs = jsonf.map(data => LogStashV1(data("message").toString, data("path").toString, data("host").toString, data("lineno").toString.toDouble, data("timestamp").toString))
> logs.foreachRDD( rdd => { 
>  import sqc.implicits._
>  rdd.toDF.registerTempTable("logstash")
>  val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
>  sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
> sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
>  {quote}
> jmap information:
>  {quote}
>  num     #instances         #bytes  class name
> ----------------------------------------------
>    1:         34819       72711952  [B
>    2:       2297557       66010656  [C
>    3:       2296294       55111056  java.lang.String
>    4:       1063491       42539640  org.apache.spark.scheduler.AccumulableInfo
>    5:       1251001       40032032  scala.collection.immutable.HashMap$HashMap1
>    6:       1394364       33464736  java.lang.Long
>    7:       1102516       26460384  scala.collection.immutable.$colon$colon
>    8:       1058202       25396848  org.apache.spark.sql.execution.metric.LongSQLMetricValue
>    9:       1266499       20263984  scala.Some
>   10:        124052       15889104  <methodKlass>
>   11:        124052       15269568  <constMethodKlass>
>   12:         11350       12082432  <constantPoolKlass>
>   13:         11350       11692880  <instanceKlassKlass>
>   14:         96682       10828384  org.apache.spark.executor.TaskMetrics
>   15:        233481        9505896  [Lscala.collection.immutable.HashMap;
>   16:         96682        6961104  org.apache.spark.scheduler.TaskInfo
>   17:          9589        6433312  <constantPoolCacheKlass>
>   18:        233000        5592000  scala.collection.immutable.HashMap$HashTrieMap
>   19:         96200        5387200  org.apache.spark.executor.ShuffleReadMetrics
>   20:        113381        3628192  scala.collection.mutable.ListBuffer
>   21:          7252        2891792  <methodDataKlass>
>   22:        117073        2809752  scala.collection.mutable.DefaultEntry
>  {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org