You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Peter Nijem (JIRA)" <ji...@apache.org> on 2019/06/13 06:01:00 UTC

[jira] [Commented] (SPARK-27842) Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()

    [ https://issues.apache.org/jira/browse/SPARK-27842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862739#comment-16862739 ] 

Peter Nijem commented on SPARK-27842:
-------------------------------------

[~hyukjin.kwon] do you need another input from my side?

If so, please let me know and I will be glad to help.

 

Peter

> Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27842
>                 URL: https://issues.apache.org/jira/browse/SPARK-27842
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, Spark Core, Windows
>    Affects Versions: 2.3.1
>            Reporter: Peter Nijem
>            Priority: Major
>         Attachments: vectorList.txt
>
>
> Hi,
> I am working with Spark Java API in local mode (1 node, 8 cores). Spark version as follows in my pom.xml:
> *MLLib*
> _<artifactId>spark-mllib_2.11</artifactId>_
>  _<version>2.3.1</version>_
> *Core*
> _<artifactId>spark-core_2.11</artifactId>_
>  _<version>2.3.1</version>_
> I am experiencing inconsistent results of correlation when starting my Spark application with 8 cores vs 1/2/3 cores.
> I've created a Main class which reads from a file a list of Vectors; 240 Vector which each Vector is of the length of 226. 
> As you can see, I am firstly initializing Spark with local[*], running Correlation, saving the Matrix and stopping Spark. Then, I do the same but for local[3].
> I am expecting to get the same matrices on both runs. But this is not the case. The input file is attached.
> I tried to compute the correlation using PearsonCorrealtion.computeCorrelationMatrix() but I faced the same issue here as well.
>  
> In my work, I am dependent on the resulting correlation matrix. Thus, I am experiencing bad results in y application due to the inconsistent results I am getting. As a workaround, I am working now with local[1]
>  
>  
> {code:java}
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.mllib.linalg.DenseVector;
> import org.apache.spark.mllib.linalg.Matrix;
> import org.apache.spark.mllib.linalg.Vector;
> import org.apache.spark.mllib.stat.Statistics;
> import org.apache.spark.rdd.RDD;
> import org.junit.Assert;
> import java.io.BufferedReader;
> import java.io.FileReader;
> import java.io.IOException;
> import java.math.RoundingMode;
> import java.text.DecimalFormat;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.stream.Collectors;
> public class TestSparkCorr {
> private static JavaSparkContext ctx;
> public static void main(String[] args) {
> List<List<Double>> doublesLists = readInputFile();
> List<Vector> resultVectors = getVectorsList(doublesLists);
> //===========================================================================
> initSpark("*");
> RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd();
> Matrix m = Statistics.corr(RDD_vector, "pearson");
> stopSpark();
> //===========================================================================
> initSpark("3");
> RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd();
> Matrix m3 = Statistics.corr(RDD_vector_3, "pearson");
> stopSpark();
> //===========================================================================
> Assert.assertEquals(m3, m);
> }
> private static List<Vector> getVectorsList(List<List<Double>> doublesLists) {
> List<Vector> resultVectors = new ArrayList<>(doublesLists.size());
> for (List<Double> vector : doublesLists) {
> double[] x = new double[vector.size()];
> for(int i = 0; i < x.length; i++){
> x[i] = vector.get(i);
> }
> resultVectors.add(new DenseVector(x));
> }
> return resultVectors;
> }
> private static List<List<Double>> readInputFile() {
> List<List<Double>> doublesLists = new ArrayList<>();
> try (BufferedReader reader = new BufferedReader(new FileReader(
> ".//output//vectorList.txt"))) {
> String line = reader.readLine();
> while (line != null) {
> String[] splitLine = line.substring(1, line.length() - 2).split(",");
> List<Double> doubles = Arrays.stream(splitLine).map(x -> Double.valueOf(x.trim())).collect(Collectors.toList());
> doublesLists.add(doubles);
> // read next line
> line = reader.readLine();
> }
> } catch (IOException e) {
> e.printStackTrace();
> }
> return doublesLists;
> }
> private static void initSpark(String coresNum) {
> final SparkConf sparkConf = new SparkConf().setAppName("Span");
> sparkConf.setMaster(String.format("local[%s]", coresNum));
> sparkConf.set("spark.ui.enabled", "false");
> ctx = new JavaSparkContext(sparkConf);
> }
> private static void stopSpark() {
> ctx.stop();
> if(ctx.sc().isStopped()){
> ctx = null;
> }
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org