You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Khang Pham (Jira)" <ji...@apache.org> on 2020/10/23 20:48:00 UTC
[jira] [Updated] (SPARK-33232) ConcurrentAppendException while
updating delta lake table
[ https://issues.apache.org/jira/browse/SPARK-33232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Khang Pham updated SPARK-33232:
-------------------------------
Description:
I have two Spark Streaming job run concurrently.
* Stream join Job: join in Kafka Stream with another stream from Amazon SQS. The result will be appended into delta lake table A
* Upsert job: read from one Delta lake table B and update table A when there are matching IDs.
Environment:
* Databricks cloud run time 7.2, Spark 3.0.0
The stream join job works fine but the Upsert job kept failing.
Stack trace:
com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [dt=2020-xx-yy, request_hour=2020-xx-yy 23:00:00] by a concurrent update.
Please try the operation again. Conflicting commit: {"timestamp":1603477588946,"userId":"xxxxx","operation":"OPTIMIZE","operationParameters":
{"predicate":[],"zOrderBy":[],"batchId":0,"auto":true}
,"job":\{"jobId":"xxxxx","jobName":"Streaming join xxx","runId":"xxx","jobOwnerId":"xxxx","triggerType":"manual"},"notebook":\{"notebookId":"xxxx"},"clusterId":"xxxxxx","readVersion":2222222222,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":\{"numRemovedFiles":"444444","numRemovedBytes":"64455537","p25FileSize":"63341646","minFileSize":"63341646","numAddedFiles":"1","maxFileSize":"63341646","p75FileSize":"63341646","p50FileSize":"63341646","numAddedBytes":"63341646"}}
Table A these setting:
- 'delta.isolationLevel' = 'WriteSerializable'
- spark.databricks.delta.optimizeWrite.enable = True
- spark.databricks.delta.autoCompact.enabled = True
Other settings:
spark.databricks.io.cache.compression.enabled true
stateStore = rocksdb
spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true
I already set IsolationLevel to WriteSerializable to handle ConcurrentAppendingException as described in [https://docs.databricks.com/delta/optimizations/isolation-level.html]
However the error says "SnapshotIsolation".
What did I miss?
was:
I have two Spark Streaming job run concurrently.
* Stream join Job: join in Kafka Stream with another stream from Amazon SQS. The result will be appended into delta lake table A
* Upsert job: read from one Delta lake table B and update table A when there are matching ID.
Environment:
* Databricks cloud run time 7.2, Spark 3.0.0
The stream join job works fine but the Upsert job kept failing.
Stack trace:
com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [dt=2020-xx-yy, request_hour=2020-xx-yy 23:00:00] by a concurrent update.
Please try the operation again. Conflicting commit: \{"timestamp":1603477588946,"userId":"xxxxx","operation":"OPTIMIZE","operationParameters":{"predicate":[],"zOrderBy":[],"batchId":0,"auto":true},"job":\{"jobId":"xxxxx","jobName":"Streaming join xxx","runId":"xxx","jobOwnerId":"xxxx","triggerType":"manual"},"notebook":\{"notebookId":"xxxx"},"clusterId":"xxxxxx","readVersion":2222222222,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":\{"numRemovedFiles":"444444","numRemovedBytes":"64455537","p25FileSize":"63341646","minFileSize":"63341646","numAddedFiles":"1","maxFileSize":"63341646","p75FileSize":"63341646","p50FileSize":"63341646","numAddedBytes":"63341646"}}
Table A these setting:
- 'delta.isolationLevel' = 'WriteSerializable'
- spark.databricks.delta.optimizeWrite.enable = True
- spark.databricks.delta.autoCompact.enabled = True
Other settings:
spark.databricks.io.cache.compression.enabled true
stateStore = rocksdb
spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true
I already set IsolationLevel to WriteSerializable to handle ConcurrentAppendingException as described in [https://docs.databricks.com/delta/optimizations/isolation-level.html]
However the error says "SnapshotIsolation".
What did I miss?
> ConcurrentAppendException while updating delta lake table
> ---------------------------------------------------------
>
> Key: SPARK-33232
> URL: https://issues.apache.org/jira/browse/SPARK-33232
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.0.0
> Reporter: Khang Pham
> Priority: Major
>
> I have two Spark Streaming job run concurrently.
> * Stream join Job: join in Kafka Stream with another stream from Amazon SQS. The result will be appended into delta lake table A
> * Upsert job: read from one Delta lake table B and update table A when there are matching IDs.
> Environment:
> * Databricks cloud run time 7.2, Spark 3.0.0
>
> The stream join job works fine but the Upsert job kept failing.
>
> Stack trace:
> com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [dt=2020-xx-yy, request_hour=2020-xx-yy 23:00:00] by a concurrent update.
> Please try the operation again. Conflicting commit: {"timestamp":1603477588946,"userId":"xxxxx","operation":"OPTIMIZE","operationParameters":
> {"predicate":[],"zOrderBy":[],"batchId":0,"auto":true}
> ,"job":\{"jobId":"xxxxx","jobName":"Streaming join xxx","runId":"xxx","jobOwnerId":"xxxx","triggerType":"manual"},"notebook":\{"notebookId":"xxxx"},"clusterId":"xxxxxx","readVersion":2222222222,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":\{"numRemovedFiles":"444444","numRemovedBytes":"64455537","p25FileSize":"63341646","minFileSize":"63341646","numAddedFiles":"1","maxFileSize":"63341646","p75FileSize":"63341646","p50FileSize":"63341646","numAddedBytes":"63341646"}}
>
> Table A these setting:
> - 'delta.isolationLevel' = 'WriteSerializable'
> - spark.databricks.delta.optimizeWrite.enable = True
> - spark.databricks.delta.autoCompact.enabled = True
>
> Other settings:
> spark.databricks.io.cache.compression.enabled true
> stateStore = rocksdb
> spark.sql.adaptive.enabled true
> spark.sql.adaptive.skewJoin.enabled true
>
> I already set IsolationLevel to WriteSerializable to handle ConcurrentAppendingException as described in [https://docs.databricks.com/delta/optimizations/isolation-level.html]
>
> However the error says "SnapshotIsolation".
>
> What did I miss?
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org