You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "weilin.chen (JIRA)" <ji...@apache.org> on 2016/10/12 09:38:20 UTC

[jira] [Created] (SPARK-17888) Mseory leak in streaming driver when use SparkSQL in Streaming

weilin.chen created SPARK-17888:
-----------------------------------

             Summary: Mseory leak in streaming driver when use SparkSQL in Streaming
                 Key: SPARK-17888
                 URL: https://issues.apache.org/jira/browse/SPARK-17888
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.6.2
         Environment: scala 2.10.4
java 1.7.0_71
            Reporter: weilin.chen


Hi
  I have a little program of spark 1.5, it receive data from a publisher in spark streaming. It will process these received data with spark sql. But when the time goes by I found the memory leak in driver, so i update to spark 1.6.2. But, there is no change in the situation.

here is the code:
{quote}
 val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, "subresult"))
val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val logs = jsonf.map(data => LogStashV1(data("message").toString, data("path").toString, data("host").toString, data("lineno").toString.toDouble, data("timestamp").toString))
logs.foreachRDD( rdd => { 
 import sqc.implicits._
 rdd.toDF.registerTempTable("logstash")
 val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
 sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
 {quote}


jmap information:

 {quote}
 num     #instances         #bytes  class name
----------------------------------------------
   1:         34819       72711952  [B
   2:       2297557       66010656  [C
   3:       2296294       55111056  java.lang.String
   4:       1063491       42539640  org.apache.spark.scheduler.AccumulableInfo
   5:       1251001       40032032  scala.collection.immutable.HashMap$HashMap1
   6:       1394364       33464736  java.lang.Long
   7:       1102516       26460384  scala.collection.immutable.$colon$colon
   8:       1058202       25396848  org.apache.spark.sql.execution.metric.LongSQLMetricValue
   9:       1266499       20263984  scala.Some
  10:        124052       15889104  <methodKlass>
  11:        124052       15269568  <constMethodKlass>
  12:         11350       12082432  <constantPoolKlass>
  13:         11350       11692880  <instanceKlassKlass>
  14:         96682       10828384  org.apache.spark.executor.TaskMetrics
  15:        233481        9505896  [Lscala.collection.immutable.HashMap;
  16:         96682        6961104  org.apache.spark.scheduler.TaskInfo
  17:          9589        6433312  <constantPoolCacheKlass>
  18:        233000        5592000  scala.collection.immutable.HashMap$HashTrieMap
  19:         96200        5387200  org.apache.spark.executor.ShuffleReadMetrics
  20:        113381        3628192  scala.collection.mutable.ListBuffer
  21:          7252        2891792  <methodDataKlass>
  22:        117073        2809752  scala.collection.mutable.DefaultEntry
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org