You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kim Min Woo (Jira)" <ji...@apache.org> on 2021/02/20 16:42:00 UTC
[jira] [Created] (SPARK-34485) GeneratedIterator grows beyond 64 KB
in Spark Structured Streaming application
Kim Min Woo created SPARK-34485:
-----------------------------------
Summary: GeneratedIterator grows beyond 64 KB in Spark Structured Streaming application
Key: SPARK-34485
URL: https://issues.apache.org/jira/browse/SPARK-34485
Project: Spark
Issue Type: Bug
Components: SQL, Structured Streaming
Affects Versions: 3.0.1, 3.0.0
Environment: ||Name||Version||
|OS|Ubuntu 18.04|
|JAVA|11.0.9|
|Scala|2.12.10|
|Spark|3.0.1|
Reporter: Kim Min Woo
I ran the following Spark Structured Streaming application, but failed with ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3" grows beyond 64 KB
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
{code}
However, if I increase the slideDuration(from 1 second to 2 seconds) value, the above error message does not show up.
It's a simple code, but I don't understand why the GeneratedIterator grows beyond 64 KB.
[With this link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is possible to check the the entire error messages.
It can be observed that the "Expand" Unary Logical Operator is unusually long in the error messages.
Codes
{code:java}
def inputSchema: StructType = StructType(
Seq(
StructField("timestamp", LongType, nullable = false),
StructField("missingInfo", LongType, nullable = true),
StructField("jobId", LongType, nullable = false),
StructField("taskId", LongType, nullable = false),
StructField("machineId", LongType, nullable = true),
StructField("eventType", IntegerType, nullable = false),
StructField("userId", IntegerType, nullable = true),
StructField("category", IntegerType, nullable = true),
StructField("priority", IntegerType, nullable = false),
StructField("cpu", FloatType, nullable = true),
StructField("ram", FloatType, nullable = true),
StructField("disk", FloatType, nullable = true),
StructField("constraints", IntegerType, nullable = true),
)
)
{code}
{code:java}
ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("enable.auto.commit", "false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "1000")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" -> ",")).as("task_event"))
.withColumn("event_time", (col("task_event.timestamp") / 1000 + startTraceTime).cast(TimestampType))
.where("task_event.eventType == 1")
.dropDuplicates("key")
.withWatermark("event_time", "60 seconds")
.groupBy(
window(col("event_time"), "60 seconds", "1 second"),
col("task_event.jobId")
).agg(avg("task_event.cpu").as("avgCpu"))
.writeStream
.format("console")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org