You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2018/07/13 05:31:01 UTC
[jira] [Updated] (SPARK-23004) Structured Streaming raise
"llegalStateException: Cannot remove after already committed or aborted"
[ https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-23004:
----------------------------
Fix Version/s: (was: 3.0.0)
2.4.0
> Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"
> ---------------------------------------------------------------------------------------------------
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
> Reporter: secfree
> Assignee: Tathagata Das
> Priority: Major
> Fix For: 2.3.1, 2.4.0
>
>
> A structured streaming query with a streaming aggregation can throw the following error in rare cases.
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659 ) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, executor driver): java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>
> This can happen when the following conditions are accidentally hit.
> # Streaming aggregation with aggregation function that is a subset of {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}} (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.).
> # Query running in {{update}} mode
> # After the shuffle, a partition has exactly 128 records.
> This happens because of the following.
> # The {{StateStoreSaveExec}} used in streaming aggregations has the [following logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359] when used in {{update}} mode.
> ## There is an iterator that reads data from its parent iterator and updates the StateStore.
> ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} returns false) then all state changes are committed by calling {{StateStore.commit}}.
> ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} does not allow itself to be called twice. However, the logic is such that, if {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has returned false then each time it will call {{StateStore.commit}}.
> ## For most aggregation functions, this is okay because {{hasNext}} is only called once. But thats not the case with {{ImperativeTypedAggregates}}.
> # {{ImperativeTypedAggregates}} are executed using {{ObjectHashAggregateExec}} which will try to use two kinds of hashmaps for aggregations.
> ## It will first try to use an unsorted hashmap. If the size of the hashmap increases beyond a certain threshold (default 128), then it will switch to using a sorted hashmap.
> ## The [switching logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala] in {{ObjectAggregationIterator}} (used by {{ObjectHashAggregateExec}}) is such that when the number of records matches the threshold (i.e. 128), it will end up calling the {{iterator.hasNext}} twice.
> When combined with the above two conditions are combined, it leads to the above error. This latent bug has existed since Spark 2.1 when {{ObjectHashAggregateExec}} was introduced in Spark.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org