You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/10/13 10:02:20 UTC
[jira] [Commented] (SPARK-17888) Memory leak in streaming driver
when use SparkSQL in Streaming
[ https://issues.apache.org/jira/browse/SPARK-17888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15571476#comment-15571476 ]
Sean Owen commented on SPARK-17888:
-----------------------------------
What makes you believe there is a leak? if you're not running out of memory, I don't think there is an issue here.
> Memory leak in streaming driver when use SparkSQL in Streaming
> --------------------------------------------------------------
>
> Key: SPARK-17888
> URL: https://issues.apache.org/jira/browse/SPARK-17888
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.6.2
> Environment: scala 2.10.4
> java 1.7.0_71
> Reporter: weilin.chen
> Labels: leak, memory
>
> Hi
> I have a little program of spark 1.5, it receive data from a publisher in spark streaming. It will process these received data with spark sql. But when the time goes by I found the memory leak in driver, so i update to spark 1.6.2. But, there is no change in the situation.
> here is the code:
> {quote}
> val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, "subresult"))
> val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
> val logs = jsonf.map(data => LogStashV1(data("message").toString, data("path").toString, data("host").toString, data("lineno").toString.toDouble, data("timestamp").toString))
> logs.foreachRDD( rdd => {
> import sqc.implicits._
> rdd.toDF.registerTempTable("logstash")
> val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
> sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
> sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
> {quote}
> jmap information:
> {quote}
> num #instances #bytes class name
> ----------------------------------------------
> 1: 34819 72711952 [B
> 2: 2297557 66010656 [C
> 3: 2296294 55111056 java.lang.String
> 4: 1063491 42539640 org.apache.spark.scheduler.AccumulableInfo
> 5: 1251001 40032032 scala.collection.immutable.HashMap$HashMap1
> 6: 1394364 33464736 java.lang.Long
> 7: 1102516 26460384 scala.collection.immutable.$colon$colon
> 8: 1058202 25396848 org.apache.spark.sql.execution.metric.LongSQLMetricValue
> 9: 1266499 20263984 scala.Some
> 10: 124052 15889104 <methodKlass>
> 11: 124052 15269568 <constMethodKlass>
> 12: 11350 12082432 <constantPoolKlass>
> 13: 11350 11692880 <instanceKlassKlass>
> 14: 96682 10828384 org.apache.spark.executor.TaskMetrics
> 15: 233481 9505896 [Lscala.collection.immutable.HashMap;
> 16: 96682 6961104 org.apache.spark.scheduler.TaskInfo
> 17: 9589 6433312 <constantPoolCacheKlass>
> 18: 233000 5592000 scala.collection.immutable.HashMap$HashTrieMap
> 19: 96200 5387200 org.apache.spark.executor.ShuffleReadMetrics
> 20: 113381 3628192 scala.collection.mutable.ListBuffer
> 21: 7252 2891792 <methodDataKlass>
> 22: 117073 2809752 scala.collection.mutable.DefaultEntry
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org