You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lior Chaga (JIRA)" <ji...@apache.org> on 2015/04/26 08:26:38 UTC

[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

     [ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lior Chaga updated SPARK-7032:
------------------------------
    Description: 
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.

In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

    public static void main(String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("Simple Application")
                .setMaster("local[1]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<MyObject> firstList = new ArrayList<MyObject>(2);
        firstList.add(new MyObject(1, 10));
        firstList.add(new MyObject(2, 10));

        List<MyObject> secondList = new ArrayList<MyObject>(3);
        secondList.add(new MyObject(1, 4));
        secondList.add(new MyObject(1, 6));
        secondList.add(new MyObject(2, 8));

        JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
        JavaRDD<MyObject> secondRdd = sc.parallelize(secondList);

        JavaSQLContext sqlc = new JavaSQLContext(sc);
        sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
        sqlc.sqlContext().cacheTable("table1");
        sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
        sqlc.sqlContext().cacheTable("table2");

        List<Row> firstMinusSecond = sqlc.sql(
            "SELECT key, counter FROM table1 " +
            "EXCEPT " +
            "SELECT key, SUM(counter) FROM table2 " +
            "GROUP BY key ").collect();

        System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");

        sc.close();
        System.exit(0);
    }

    public static class MyObject implements Serializable {

        public MyObject(Integer key, Integer counter) {
            this.key = key;
            this.counter = counter;
        }

        private Integer key;
        private Integer counter;

        public Integer getKey() {
            return key;
        }

        public void setKey(Integer key) {
            this.key = key;
        }

        public Integer getCounter() {
            return counter;
        }

        public void setCounter(Integer counter) {
            this.counter = counter;
        }
    }

}
{code}

  was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.

In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

    public static void main(String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("Simple Application")
                .setMaster("local[1]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<MyObject> firstList = new ArrayList<MyObject>(2);
        firstList.add(new MyObject(1, 10));
        firstList.add(new MyObject(2, 10));

        List<MyObject> secondList = new ArrayList<MyObject>(3);
        secondList.add(new MyObject(1, 4));
        secondList.add(new MyObject(1, 6));
        secondList.add(new MyObject(2, 8));

        JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
        JavaRDD<MyObject> secondRdd = sc.parallelize(firstList);

        JavaSQLContext sqlc = new JavaSQLContext(sc);
        sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
        sqlc.sqlContext().cacheTable("table1");
        sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
        sqlc.sqlContext().cacheTable("table2");

        List<Row> firstMinusSecond = sqlc.sql(
            "SELECT key, counter FROM table1 " +
            "EXCEPT " +
            "SELECT key, SUM(counter) FROM table2 " +
            "GROUP BY key ").collect();

        System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");

        sc.close();
        System.exit(0);
    }

    public static class MyObject implements Serializable {

        public MyObject(Integer key, Integer counter) {
            this.key = key;
            this.counter = counter;
        }

        private Integer key;
        private Integer counter;

        public Integer getKey() {
            return key;
        }

        public void setKey(Integer key) {
            this.key = key;
        }

        public Integer getCounter() {
            return counter;
        }

        public void setCounter(Integer counter) {
            this.counter = counter;
        }
    }

}
{code}


> SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
> -----------------------------------------------------------------------
>
>                 Key: SPARK-7032
>                 URL: https://issues.apache.org/jira/browse/SPARK-7032
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.2
>            Reporter: Lior Chaga
>
> When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.
> In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
> Each of the clauses by itself is working properly when running exclusively. 
> {code}
> //import com.addthis.metrics.reporter.config.ReporterConfig;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.api.java.JavaSQLContext;
> import org.apache.spark.sql.api.java.Row;
> import java.io.IOException;
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
> public class SimpleApp {
>     public static void main(String[] args) throws IOException {
>         SparkConf conf = new SparkConf().setAppName("Simple Application")
>                 .setMaster("local[1]");
>         JavaSparkContext sc = new JavaSparkContext(conf);
>         List<MyObject> firstList = new ArrayList<MyObject>(2);
>         firstList.add(new MyObject(1, 10));
>         firstList.add(new MyObject(2, 10));
>         List<MyObject> secondList = new ArrayList<MyObject>(3);
>         secondList.add(new MyObject(1, 4));
>         secondList.add(new MyObject(1, 6));
>         secondList.add(new MyObject(2, 8));
>         JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
>         JavaRDD<MyObject> secondRdd = sc.parallelize(secondList);
>         JavaSQLContext sqlc = new JavaSQLContext(sc);
>         sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
>         sqlc.sqlContext().cacheTable("table1");
>         sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
>         sqlc.sqlContext().cacheTable("table2");
>         List<Row> firstMinusSecond = sqlc.sql(
>             "SELECT key, counter FROM table1 " +
>             "EXCEPT " +
>             "SELECT key, SUM(counter) FROM table2 " +
>             "GROUP BY key ").collect();
>         System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");
>         sc.close();
>         System.exit(0);
>     }
>     public static class MyObject implements Serializable {
>         public MyObject(Integer key, Integer counter) {
>             this.key = key;
>             this.counter = counter;
>         }
>         private Integer key;
>         private Integer counter;
>         public Integer getKey() {
>             return key;
>         }
>         public void setKey(Integer key) {
>             this.key = key;
>         }
>         public Integer getCounter() {
>             return counter;
>         }
>         public void setCounter(Integer counter) {
>             this.counter = counter;
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org