You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/06/19 13:58:00 UTC

[jira] [Resolved] (SPARK-28062) HuberAggregator copies coefficients vector every time an instance is added

     [ https://issues.apache.org/jira/browse/SPARK-28062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-28062.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 24880
[https://github.com/apache/spark/pull/24880]

> HuberAggregator copies coefficients vector every time an instance is added
> --------------------------------------------------------------------------
>
>                 Key: SPARK-28062
>                 URL: https://issues.apache.org/jira/browse/SPARK-28062
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 3.0.0
>            Reporter: Andrew Crosby
>            Assignee: Andrew Crosby
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Every time an instance is added to the HuberAggregator, a copy of the coefficients vector is created (see code snippet below). This causes a performance degradation, which is particularly severe when the instances have long sparse feature vectors.
> {code:scala}
> def add(instance: Instance): HuberAggregator = {
>     instance match { case Instance(label, weight, features) =>
>       require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
>         s" Expecting $numFeatures but got ${features.size}.")
>       require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
>       if (weight == 0.0) return this
>       val localFeaturesStd = bcFeaturesStd.value
>       val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures)
> val localGradientSumArray = gradientSumArray
> // Snip
> }
> {code}
> The LeastSquaresAggregator class avoids this performance issue via the use of transient lazy class variables to store such reused values. Applying a similar approach to HuberAggregator gives a significant speed boost. Running the script below locally on my machine gives the following timing results:
> {noformat}
> Current implementation: 
>     Time(s): 540.1439919471741
>     Iterations: 26
>     Intercept: 0.518109382890512
>     Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529]
> Modified implementation to match LeastSquaresAggregator:
>     Time(s): 46.82946586608887
>     Iterations: 26
>     Intercept: 0.5181093828893774
>     Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518]
> {noformat}
> {code:python}
> from random import random, randint, seed
> import time
> from pyspark.ml.feature import OneHotEncoder
> from pyspark.ml.regression import LinearRegression
> from pyspark.sql import SparkSession
> seed(0)
> spark = SparkSession.builder.appName('huber-speed-test').getOrCreate()
> df = spark.createDataFrame([[randint(0, 100000), random()] for i in range(100000)],  ["category", "target"])
> ohe = OneHotEncoder(inputCols=["category"], outputCols=["encoded_category"]).fit(df)
> lr = LinearRegression(featuresCol="encoded_category", labelCol="target", loss="huber", regParam=1.0)
> start = time.time()
> model = lr.fit(ohe.transform(df))
> end = time.time()
> print("Time(s): " + str(end - start))
> print("Iterations: " + str(model.summary.totalIterations))
> print("Intercept: " + str(model.intercept))
> print("Coefficients: " + str(list(model.coefficients)[0:10]))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org