You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhen Wang (Jira)" <ji...@apache.org> on 2022/07/28 12:06:00 UTC
[jira] [Updated] (SPARK-37210) An error occurred while concurrently writing to different static partitions
[ https://issues.apache.org/jira/browse/SPARK-37210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhen Wang updated SPARK-37210:
------------------------------
Description:
An error occurred while concurrently writing to different static partitions.
For writing to a static partition, committerOutputPath is the location path of the table. When multiple tasks write to the same table concurrently, the _temporary path will be deleted after one task ends, causing another task to fail.
test code:
{code:java}
// code placeholder
object HiveTests {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("HiveTests")
.enableHiveSupport()
.getOrCreate()
//rows
val users1 = new util.ArrayList[Row]()
users1.add(Row(1, "user1", "2021-11-03", 10))
users1.add(Row(2, "user2", "2021-11-03", 10))
users1.add(Row(3, "user3", "2021-11-03", 10))
//schema
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("dt", StringType, true),
StructField("hour", IntegerType, true)
))
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("drop table if exists default.test")
spark.sql(
"""
|create table if not exists default.test (
| id int,
| name string)
|partitioned by (dt string, hour int)
|stored as parquet
|""".stripMargin)
spark.sql("desc formatted default.test").show()
spark.sqlContext
.createDataFrame(users1, structType)
.select("id", "name")
.createOrReplaceTempView("user1")
val thread1 = new Thread(() => {
// spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-03', hour=10) select * from user1")
spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-03', hour=10) select * from user1")
})
thread1.start()
val thread2 = new Thread(() => {
// spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-04', hour=10) select * from user1")
spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-04', hour=10) select * from user1")
})
thread2.start()
thread1.join()
thread2.join()
spark.sql("select * from test").show()
spark.stop()
}
} {code}
error message:
{code:java}
// code placeholder
21/11/04 19:01:21 ERROR Utils: Aborting task
ExitCodeException exitCode=1: chmod: cannot access '/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_0001_m_000001_
4/dt=2021-11-03/hour=10/.part-00001-95895b03-45d2-4ac6-806b-b76fd1dfa3dc.c000.snappy.parquet.crc': No such file or directory at org.apache.hadoop.util.Shell.runCommand(Shell.java:1008)
at org.apache.hadoop.util.Shell.run(Shell.java:901)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
at org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:290)
at org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:357)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/11/04 19:01:21 WARN FileOutputCommitter: Could not delete file:/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_
0001_m_000001_4
{code}
was:
An error occurred while concurrently writing to different static partitions.
For writing to a static partition, committerOutputPath is the location path of the table. When multiple tasks write to the same table concurrently, the _temporary path will be deleted after one task ends, causing another task to fail.
test code:
{code:java}
// code placeholder
object HiveTests {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("HiveTests")
.enableHiveSupport()
.getOrCreate()
//rows
val users1 = new util.ArrayList[Row]()
users1.add(Row(1, "user1", "2021-11-03", 10))
users1.add(Row(2, "user2", "2021-11-03", 10))
users1.add(Row(3, "user3", "2021-11-03", 10))
//schema
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("dt", StringType, true),
StructField("hour", IntegerType, true)
))
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("drop table if exists default.test")
spark.sql(
"""
|create table if not exists default.test (
| id int,
| name string)
|partitioned by (dt string, hour int)
|stored as parquet
|""".stripMargin)
spark.sql("desc formatted default.test").show()
spark.sqlContext
.createDataFrame(users1, structType)
.select("id", "name")
.createOrReplaceTempView("user1")
val thread1 = new Thread(() => {
spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-03', hour=10) select * from user1")
})
thread1.start()
val thread2 = new Thread(() => {
spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-04', hour=10) select * from user1")
})
thread2.start()
thread1.join()
thread2.join()
spark.sql("select * from test").show()
spark.stop()
}
}
{code}
error message:
{code:java}
// code placeholder
21/11/04 19:01:21 ERROR Utils: Aborting task
ExitCodeException exitCode=1: chmod: cannot access '/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_0001_m_000001_
4/dt=2021-11-03/hour=10/.part-00001-95895b03-45d2-4ac6-806b-b76fd1dfa3dc.c000.snappy.parquet.crc': No such file or directory at org.apache.hadoop.util.Shell.runCommand(Shell.java:1008)
at org.apache.hadoop.util.Shell.run(Shell.java:901)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
at org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:290)
at org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:357)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/11/04 19:01:21 WARN FileOutputCommitter: Could not delete file:/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_
0001_m_000001_4
{code}
> An error occurred while concurrently writing to different static partitions
> ---------------------------------------------------------------------------
>
> Key: SPARK-37210
> URL: https://issues.apache.org/jira/browse/SPARK-37210
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.1, 3.2.0
> Reporter: Zhen Wang
> Priority: Major
> Attachments: [SPARK-37210]_Write_to_static_partition_in_dynamic_write_mode.patch, image-2021-11-05-15-28-41-393.png
>
>
> An error occurred while concurrently writing to different static partitions.
> For writing to a static partition, committerOutputPath is the location path of the table. When multiple tasks write to the same table concurrently, the _temporary path will be deleted after one task ends, causing another task to fail.
>
> test code:
>
> {code:java}
> // code placeholder
> object HiveTests {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder()
> .master("local[*]")
> .appName("HiveTests")
> .enableHiveSupport()
> .getOrCreate()
> //rows
> val users1 = new util.ArrayList[Row]()
> users1.add(Row(1, "user1", "2021-11-03", 10))
> users1.add(Row(2, "user2", "2021-11-03", 10))
> users1.add(Row(3, "user3", "2021-11-03", 10))
> //schema
> val structType = StructType(Array(
> StructField("id", IntegerType, true),
> StructField("name", StringType, true),
> StructField("dt", StringType, true),
> StructField("hour", IntegerType, true)
> ))
> spark.sql("set hive.exec.dynamic.partition=true")
> spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
> spark.sql("drop table if exists default.test")
> spark.sql(
> """
> |create table if not exists default.test (
> | id int,
> | name string)
> |partitioned by (dt string, hour int)
> |stored as parquet
> |""".stripMargin)
> spark.sql("desc formatted default.test").show()
> spark.sqlContext
> .createDataFrame(users1, structType)
> .select("id", "name")
> .createOrReplaceTempView("user1")
> val thread1 = new Thread(() => {
> // spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-03', hour=10) select * from user1")
> spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-03', hour=10) select * from user1")
> })
> thread1.start()
> val thread2 = new Thread(() => {
> // spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-04', hour=10) select * from user1")
> spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-04', hour=10) select * from user1")
> })
> thread2.start()
> thread1.join()
> thread2.join()
> spark.sql("select * from test").show()
> spark.stop()
> }
> } {code}
>
> error message:
>
> {code:java}
> // code placeholder
> 21/11/04 19:01:21 ERROR Utils: Aborting task
> ExitCodeException exitCode=1: chmod: cannot access '/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_0001_m_000001_
> 4/dt=2021-11-03/hour=10/.part-00001-95895b03-45d2-4ac6-806b-b76fd1dfa3dc.c000.snappy.parquet.crc': No such file or directory at org.apache.hadoop.util.Shell.runCommand(Shell.java:1008)
> at org.apache.hadoop.util.Shell.run(Shell.java:901)
> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
> at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
> at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
> at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
> at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
> at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
> at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437)
> at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
> at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
> at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
> at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
> at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
> at org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:290)
> at org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:357)
> at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
> at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 21/11/04 19:01:21 WARN FileOutputCommitter: Could not delete file:/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_
> 0001_m_000001_4
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org