You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhengruifeng (JIRA)" <ji...@apache.org> on 2017/01/22 07:58:26 UTC
[jira] [Commented] (SPARK-19208) MultivariateOnlineSummarizer
perfermence optimization
[ https://issues.apache.org/jira/browse/SPARK-19208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833345#comment-15833345 ]
zhengruifeng commented on SPARK-19208:
--------------------------------------
After diving into sparksql's udaf, I design the new api like this:
new MultivariateOnlineSummarizer in org.apache.spark.ml.stat
{code}
class MultivariateOnlineSummarizer(private var metrics: Seq[String]) extends UserDefinedAggregateFunction {
def setMetrics(metrics: Seq[String]) = ...
def setMetrics(metric: String, others: String*) = ...
override def inputSchema: StructType = new StructType().add("weight", DoubleType).add("features", new VectorUDT)
override def bufferSchema: StructType = ...
override def dataType: DataType = DataTypes.createMapType(StringType, new VectorUDT)
override def deterministic: Boolean = true
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ...
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ...
override def evaluate(buffer: Row): Vector = ...
}
{code}
usage examples:
{code}
// in MinMaxScaler
val maxAbsAgg = new MultivariateOnlineSummarizer().setMetrics("min", "max")
val summary = dataset.groupBy().agg(maxAbsAgg(col("features"), col("weight")))
summary.show
+-----------------------------+
|multivariateonlinesummarizer(features, weight)|
+-----------------------------+
| Map(min -> [1.0,0...|
+-----------------------------+
summary.first
res2: org.apache.spark.sql.Row = [Map(min -> [1.0,0.2,0.2], max -> [1.0,0.2,0.2])]
val result = summary.first.getAs[Map[String,Vector]](0)
val min: Vector = result("min")
val max: Vector = result("max")
// in LinearRegression
val featuresAgg = new MultivariateOnlineSummarizer().setMetrics("mean", "variance")
val labelAgg = new MultivariateOnlineSummarizer().setMetrics("mean", "variance")
val result = dataset.map{...}.groupBy().agg(featuresAgg(col("features"), col("weight")), labelAgg(col("labelVec"), col("weight"))).first
val featuresMetrics = result.getAs[Map[String,Vector]](0)
val labelMetrics = result.getAs[Map[String,Vector]](1)
{code}
I have not found a way to output multi columns in udaf, so I use {{Map[String,Vector]}} as the output type temporarily. If there is some way, I'll be happy to modify this place.
> MultivariateOnlineSummarizer perfermence optimization
> -----------------------------------------------------
>
> Key: SPARK-19208
> URL: https://issues.apache.org/jira/browse/SPARK-19208
> Project: Spark
> Issue Type: Improvement
> Components: ML
> Reporter: zhengruifeng
> Attachments: Tests.pdf, WechatIMG2621.jpeg
>
>
> Now, {{MaxAbsScaler}} and {{MinMaxScaler}} are using {{MultivariateOnlineSummarizer}} to compute the min/max.
> However {{MultivariateOnlineSummarizer}} will also compute extra unused statistics. It slows down the task, moreover it is more prone to cause OOM.
> For example:
> env : --driver-memory 4G --executor-memory 1G --num-executors 4
> data: [http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#kdd2010%20(bridge%20to%20algebra)] 748401 instances, and 29,890,095 features
> {{MaxAbsScaler.fit}} fails because of OOM
> {{MultivariateOnlineSummarizer}} maintains 8 arrays:
> {code}
> private var currMean: Array[Double] = _
> private var currM2n: Array[Double] = _
> private var currM2: Array[Double] = _
> private var currL1: Array[Double] = _
> private var totalCnt: Long = 0
> private var totalWeightSum: Double = 0.0
> private var weightSquareSum: Double = 0.0
> private var weightSum: Array[Double] = _
> private var nnz: Array[Long] = _
> private var currMax: Array[Double] = _
> private var currMin: Array[Double] = _
> {code}
> For {{MaxAbsScaler}}, only 1 array is needed (max of abs value)
> For {{MinMaxScaler}}, only 3 arrays are needed (max, min, nnz)
> After modication in the pr, the above example run successfully.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org