You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Emlyn Corrin (JIRA)" <ji...@apache.org> on 2016/01/26 10:33:39 UTC

[jira] [Comment Edited] (SPARK-9740) first/last aggregate NULL behavior

    [ https://issues.apache.org/jira/browse/SPARK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116977#comment-15116977 ] 

Emlyn Corrin edited comment on SPARK-9740 at 1/26/16 9:32 AM:
--------------------------------------------------------------

I've put together a minimal example to demonstrate the problem:
{code}
package spark_test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.hive.HiveContext;

public class SparkTestMain {
    public static void main(String[] args) {
        assert args.length == 1;
        SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        SQLContext sqlCtx = new HiveContext(ctx); // I tried both SQLContext and HiveContext here
        DataFrame df = sqlCtx.read().json(args[0]);
        System.out.println(df.schema().simpleString());
        DataFrame df2 = df.select(functions.expr("FIRST(value,true)")
                                  .over(Window.partitionBy(df.col("id"))
                                        .orderBy(df.col("time"))
                                        .rowsBetween(Long.MIN_VALUE, 0)));
        System.out.println(df2.take(5));
    }
}
{code}

I ran that with:
{code}
spark-submit --master local[*] spark-test.jar test.json
{code}
And it fails with:
{code}
struct<id:bigint,time:bigint,value:string>
Exception in thread "main" java.lang.UnsupportedOperationException: 'FIRST('value,true) is not supported in a window operation.
	at org.apache.spark.sql.expressions.WindowSpec.withAggregate(WindowSpec.scala:191)
	at org.apache.spark.sql.Column.over(Column.scala:1049)
	at spark_test.SparkTestMain.main(SparkTestMain.java:23)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}


was (Author: emlyn):
I've put together a minimal example to demonstrate the problem:
{code}
package spark_test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.hive.HiveContext;

public class SparkTestMain {
    public static void main(String[] args) {
        if (args.length != 1) {
            System.err.println("Usage: SparkTest <file>");
            System.exit(1);
        }
        SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        SQLContext sqlCtx = new HiveContext(ctx); // I tried both SQLContext and HiveContext here
        DataFrame df = sqlCtx.read().json(args[0]);
        System.out.println(df.schema().simpleString());
        DataFrame df2 = df.select(functions.expr("FIRST(value,true)")
                                  .over(Window.partitionBy(df.col("id"))
                                        .orderBy(df.col("time"))
                                        .rowsBetween(Long.MIN_VALUE, 0)));
        System.out.println(df2.take(5));
    }
}
{code}

I ran that with:
{code}
spark-submit --master local[*] spark-test.jar test.json
{code}
And it fails with:
{code}
struct<id:bigint,time:bigint,value:string>
Exception in thread "main" java.lang.UnsupportedOperationException: 'FIRST('value,true) is not supported in a window operation.
	at org.apache.spark.sql.expressions.WindowSpec.withAggregate(WindowSpec.scala:191)
	at org.apache.spark.sql.Column.over(Column.scala:1049)
	at spark_test.SparkTestMain.main(SparkTestMain.java:23)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}

> first/last aggregate NULL behavior
> ----------------------------------
>
>                 Key: SPARK-9740
>                 URL: https://issues.apache.org/jira/browse/SPARK-9740
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Herman van Hovell
>            Assignee: Yin Huai
>              Labels: releasenotes
>             Fix For: 1.6.0
>
>
> The FIRST/LAST aggregates implemented as part of the new UDAF interface, return the first or last non-null value (if any) found. This is a departure from the behavior of the old FIRST/LAST aggregates and from the FIRST_VALUE/LAST_VALUE aggregates in Hive. These would return a null value, if that happened to be the first/last value seen. SPARK-9592 tries to 'fix' this behavior for the old UDAF interface.
> Hive makes this behavior configurable, by adding a skipNulls flag. I would suggest to do the same, and make the default behavior compatible with Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org