You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lior Chaga (JIRA)" <ji...@apache.org> on 2015/04/26 08:26:38 UTC
[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using
UNION/EXCEPT with GROUP BY clause
[ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lior Chaga updated SPARK-7032:
------------------------------
Description:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.
In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
Each of the clauses by itself is working properly when running exclusively.
{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SimpleApp {
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf().setAppName("Simple Application")
.setMaster("local[1]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<MyObject> firstList = new ArrayList<MyObject>(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));
List<MyObject> secondList = new ArrayList<MyObject>(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));
JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
JavaRDD<MyObject> secondRdd = sc.parallelize(secondList);
JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
sqlc.sqlContext().cacheTable("table1");
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
sqlc.sqlContext().cacheTable("table2");
List<Row> firstMinusSecond = sqlc.sql(
"SELECT key, counter FROM table1 " +
"EXCEPT " +
"SELECT key, SUM(counter) FROM table2 " +
"GROUP BY key ").collect();
System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");
sc.close();
System.exit(0);
}
public static class MyObject implements Serializable {
public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}
private Integer key;
private Integer counter;
public Integer getKey() {
return key;
}
public void setKey(Integer key) {
this.key = key;
}
public Integer getCounter() {
return counter;
}
public void setCounter(Integer counter) {
this.counter = counter;
}
}
}
{code}
was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.
In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
Each of the clauses by itself is working properly when running exclusively.
{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SimpleApp {
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf().setAppName("Simple Application")
.setMaster("local[1]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<MyObject> firstList = new ArrayList<MyObject>(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));
List<MyObject> secondList = new ArrayList<MyObject>(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));
JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
JavaRDD<MyObject> secondRdd = sc.parallelize(firstList);
JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
sqlc.sqlContext().cacheTable("table1");
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
sqlc.sqlContext().cacheTable("table2");
List<Row> firstMinusSecond = sqlc.sql(
"SELECT key, counter FROM table1 " +
"EXCEPT " +
"SELECT key, SUM(counter) FROM table2 " +
"GROUP BY key ").collect();
System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");
sc.close();
System.exit(0);
}
public static class MyObject implements Serializable {
public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}
private Integer key;
private Integer counter;
public Integer getKey() {
return key;
}
public void setKey(Integer key) {
this.key = key;
}
public Integer getCounter() {
return counter;
}
public void setCounter(Integer counter) {
this.counter = counter;
}
}
}
{code}
> SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
> -----------------------------------------------------------------------
>
> Key: SPARK-7032
> URL: https://issues.apache.org/jira/browse/SPARK-7032
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.2.2
> Reporter: Lior Chaga
>
> When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected.
> In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables).
> Each of the clauses by itself is working properly when running exclusively.
> {code}
> //import com.addthis.metrics.reporter.config.ReporterConfig;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.api.java.JavaSQLContext;
> import org.apache.spark.sql.api.java.Row;
> import java.io.IOException;
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
> public class SimpleApp {
> public static void main(String[] args) throws IOException {
> SparkConf conf = new SparkConf().setAppName("Simple Application")
> .setMaster("local[1]");
> JavaSparkContext sc = new JavaSparkContext(conf);
> List<MyObject> firstList = new ArrayList<MyObject>(2);
> firstList.add(new MyObject(1, 10));
> firstList.add(new MyObject(2, 10));
> List<MyObject> secondList = new ArrayList<MyObject>(3);
> secondList.add(new MyObject(1, 4));
> secondList.add(new MyObject(1, 6));
> secondList.add(new MyObject(2, 8));
> JavaRDD<MyObject> firstRdd = sc.parallelize(firstList);
> JavaRDD<MyObject> secondRdd = sc.parallelize(secondList);
> JavaSQLContext sqlc = new JavaSQLContext(sc);
> sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1");
> sqlc.sqlContext().cacheTable("table1");
> sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2");
> sqlc.sqlContext().cacheTable("table2");
> List<Row> firstMinusSecond = sqlc.sql(
> "SELECT key, counter FROM table1 " +
> "EXCEPT " +
> "SELECT key, SUM(counter) FROM table2 " +
> "GROUP BY key ").collect();
> System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]");
> sc.close();
> System.exit(0);
> }
> public static class MyObject implements Serializable {
> public MyObject(Integer key, Integer counter) {
> this.key = key;
> this.counter = counter;
> }
> private Integer key;
> private Integer counter;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> public Integer getCounter() {
> return counter;
> }
> public void setCounter(Integer counter) {
> this.counter = counter;
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org