You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2016/11/09 21:35:58 UTC
[jira] [Commented] (SPARK-18388) Running aggregation on many
columns throws SOE
[ https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652104#comment-15652104 ]
Herman van Hovell commented on SPARK-18388:
-------------------------------------------
Could you try this on master? We added an optimizer rule that collapses similar windows in (SPARK-17739): https://github.com/apache/spark/commit/aef506e39a41cfe7198162c324a11ef2f01136c3
> Running aggregation on many columns throws SOE
> ----------------------------------------------
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
> Reporter: Raviteja Lokineni
> Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
> .partitionBy("id")
> .orderBy(col("dt").cast("timestamp").cast("long"))
> .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org