You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/27 01:53:00 UTC

[jira] [Commented] (SPARK-27842) Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()

    [ https://issues.apache.org/jira/browse/SPARK-27842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16960498#comment-16960498 ] 

Sean R. Owen commented on SPARK-27842:
--------------------------------------

I'd like to look into it. What numbers do you see in the two runs? very different or very small differences?
My guess is that you get different orders of the values in the two cases because you have a different number of executor slots. In theory doesn't make a difference, in practice probably causes some slight differences due to some floating-point inaccuracies. I don't know enough yet to say whether that's it and whether it's resolvable.
I'd also be interested to know if it occurs on the master branch.

> Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27842
>                 URL: https://issues.apache.org/jira/browse/SPARK-27842
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, Spark Core, Windows
>    Affects Versions: 2.3.1
>            Reporter: Peter Nijem
>            Priority: Major
>         Attachments: vectorList.txt
>
>
> Hi,
> I am working with Spark Java API in local mode (1 node, 8 cores). Spark version as follows in my pom.xml:
> *MLLib*
> _<artifactId>spark-mllib_2.11</artifactId>_
>  _<version>2.3.1</version>_
> *Core*
> _<artifactId>spark-core_2.11</artifactId>_
>  _<version>2.3.1</version>_
> I am experiencing inconsistent results of correlation when starting my Spark application with 8 cores vs 1/2/3 cores.
> I've created a Main class which reads from a file a list of Vectors; 240 Vector which each Vector is of the length of 226. 
> As you can see, I am firstly initializing Spark with local[*], running Correlation, saving the Matrix and stopping Spark. Then, I do the same but for local[3].
> I am expecting to get the same matrices on both runs. But this is not the case. The input file is attached.
> I tried to compute the correlation using PearsonCorrealtion.computeCorrelationMatrix() but I faced the same issue here as well.
>  
> In my work, I am dependent on the resulting correlation matrix. Thus, I am experiencing bad results in y application due to the inconsistent results I am getting. As a workaround, I am working now with local[1]
>  
>  
> {code:java}
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.mllib.linalg.DenseVector;
> import org.apache.spark.mllib.linalg.Matrix;
> import org.apache.spark.mllib.linalg.Vector;
> import org.apache.spark.mllib.stat.Statistics;
> import org.apache.spark.rdd.RDD;
> import org.junit.Assert;
> import java.io.BufferedReader;
> import java.io.FileReader;
> import java.io.IOException;
> import java.math.RoundingMode;
> import java.text.DecimalFormat;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.stream.Collectors;
> public class TestSparkCorr {
> private static JavaSparkContext ctx;
> public static void main(String[] args) {
> List<List<Double>> doublesLists = readInputFile();
> List<Vector> resultVectors = getVectorsList(doublesLists);
> //===========================================================================
> initSpark("*");
> RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd();
> Matrix m = Statistics.corr(RDD_vector, "pearson");
> stopSpark();
> //===========================================================================
> initSpark("3");
> RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd();
> Matrix m3 = Statistics.corr(RDD_vector_3, "pearson");
> stopSpark();
> //===========================================================================
> Assert.assertEquals(m3, m);
> }
> private static List<Vector> getVectorsList(List<List<Double>> doublesLists) {
> List<Vector> resultVectors = new ArrayList<>(doublesLists.size());
> for (List<Double> vector : doublesLists) {
> double[] x = new double[vector.size()];
> for(int i = 0; i < x.length; i++){
> x[i] = vector.get(i);
> }
> resultVectors.add(new DenseVector(x));
> }
> return resultVectors;
> }
> private static List<List<Double>> readInputFile() {
> List<List<Double>> doublesLists = new ArrayList<>();
> try (BufferedReader reader = new BufferedReader(new FileReader(
> ".//output//vectorList.txt"))) {
> String line = reader.readLine();
> while (line != null) {
> String[] splitLine = line.substring(1, line.length() - 2).split(",");
> List<Double> doubles = Arrays.stream(splitLine).map(x -> Double.valueOf(x.trim())).collect(Collectors.toList());
> doublesLists.add(doubles);
> // read next line
> line = reader.readLine();
> }
> } catch (IOException e) {
> e.printStackTrace();
> }
> return doublesLists;
> }
> private static void initSpark(String coresNum) {
> final SparkConf sparkConf = new SparkConf().setAppName("Span");
> sparkConf.setMaster(String.format("local[%s]", coresNum));
> sparkConf.set("spark.ui.enabled", "false");
> ctx = new JavaSparkContext(sparkConf);
> }
> private static void stopSpark() {
> ctx.stop();
> if(ctx.sc().isStopped()){
> ctx = null;
> }
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org