You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anton Puzanov <an...@gmail.com> on 2018/01/02 14:05:11 UTC
Current way of using functions.window with Java
I write a sliding window analytic program and use the functions.window
function (
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html#window(org.apache.spark.sql.Column,%20java.lang.String,%20java.lang.String)
)
The code looks like this:
Column slidingWindow = functions.window(myDF.col("timestamp"), "24
hours", "1 seconds");
Dataset<Row> finalRes = myDF.groupBy(slidingWindow,
myDF.col("user")).agg(functions.collect_set("purchase").as("purchases"));
As you can see in this usecase I have small steps and large window.
A code with same flavor caused the following error (which in my
understanding is related to the creation of the Java code generation):
Caused by: org.spark_project.guava.util.concurrent.ExecutionError:
java.lang.OutOfMemoryError: Java heap space
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:890)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:357)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 77 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703)
at java.util.HashMap.putVal(HashMap.java:628)
at java.util.HashMap.putMapEntries(HashMap.java:514)
at java.util.HashMap.putAll(HashMap.java:784)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3073)
at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2958)
at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2926)
at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:2974)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3033)
at org.codehaus.janino.UnitCompiler.access$4400(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2950)
at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2926)
at org.codehaus.janino.Java$SwitchStatement.accept(Java.java:2866)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2926)
at org.codehaus.janino.Java$Block.accept(Java.java:2471)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2999)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2946)
at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2926)
at org.codehaus.janino.Java$ForStatement.accept(Java.java:2660)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)
When I run the code with a really small data and window=3 minutes,
step= 1 seconds I get this error:
there are tens of thousands of lines like:
/* 30761 */ if (!expand_isNull6254) {
/* 30762 */ expand_isNull6253 = false; // resultCode could
change nullability.
/* 30763 */ expand_value6253 = expand_value6254 * 1000000L;
and the error:
haus.janino.JaninoRuntimeException: Code of method "processNext()V" of
class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator"
grows beyond 64 KB
also I see hundreds of "Code generated in 4.979587 ms" plus it is
taking quite a lot time to compute although it has only 10 rows of
data.
Am I doing something wrong? is it a bug? what is the right way to use
this function?
Please relate to Java when answering.