You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anton Puzanov <an...@gmail.com> on 2018/01/02 14:05:11 UTC

Current way of using functions.window with Java

I write a sliding window analytic program and use the functions.window
function (
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html#window(org.apache.spark.sql.Column,%20java.lang.String,%20java.lang.String)
)
The code looks like this:

Column slidingWindow = functions.window(myDF.col("timestamp"), "24
hours", "1 seconds");
Dataset<Row> finalRes = myDF.groupBy(slidingWindow,
myDF.col("user")).agg(functions.collect_set("purchase").as("purchases"));


As you can see in this usecase I have small steps and large window.
A code with same flavor caused the following error (which in my
understanding is related to the creation of the Java code generation):

Caused by: org.spark_project.guava.util.concurrent.ExecutionError:
java.lang.OutOfMemoryError: Java heap space
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:890)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:357)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
    at     org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 77 more
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.HashMap.resize(HashMap.java:703)
    at java.util.HashMap.putVal(HashMap.java:628)
    at java.util.HashMap.putMapEntries(HashMap.java:514)
    at java.util.HashMap.putAll(HashMap.java:784)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3073)
    at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2958)
    at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:2974)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3033)
    at org.codehaus.janino.UnitCompiler.access$4400(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2950)
    at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$SwitchStatement.accept(Java.java:2866)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
    at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$Block.accept(Java.java:2471)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2999)
    at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2946)
    at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$ForStatement.accept(Java.java:2660)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
    at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)

When I run the code with a really small data and window=3 minutes,
step= 1 seconds I get this error:
there are tens of thousands of lines like:
/* 30761 */           if (!expand_isNull6254) {
/* 30762 */             expand_isNull6253 = false; // resultCode could
change nullability.
/* 30763 */             expand_value6253 = expand_value6254 * 1000000L;
and the error:
haus.janino.JaninoRuntimeException: Code of method "processNext()V" of
class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator"
grows beyond 64 KB

also I see hundreds of "Code generated in 4.979587 ms" plus it is
taking quite a lot time to compute although it has only 10 rows of
data.


Am I doing something wrong? is it a bug? what is the right way to use
this function?

Please relate to Java when answering.