You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lev Katzav (JIRA)" <ji...@apache.org> on 2019/03/02 10:42:00 UTC
[jira] [Created] (SPARK-27030) DataFrameWriter.insertInto fails
when writing in parallel to a hive table
Lev Katzav created SPARK-27030:
----------------------------------
Summary: DataFrameWriter.insertInto fails when writing in parallel to a hive table
Key: SPARK-27030
URL: https://issues.apache.org/jira/browse/SPARK-27030
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.4.0
Reporter: Lev Katzav
When writing to a hive table, the following temp directory is used:
{code:java}
/path/to/table/_temporary/0/{code}
(the 0 at the end comes from the config
{code:java}
"mapreduce.job.application.attempt.id"{code}
since that config is missing, it falls back to 0)
when there are 2 processes that write to the same table, there could be the following race condition:
# p1 creates temp folder and uses it
# p2 uses temp folder
# p1 finishes and deletes temp folder
# p2 fails since temp folder is missing
It is possible to recreate this error locally with the following code:
(the code runs locally, but I experienced the same error when running on a cluster
with 2 jobs writing to the same table)
{code:java}
import org.apache.spark.sql.functions._
val df = spark
.range(1000)
.toDF("a")
.withColumn("partition", lit(0))
.cache()
//create db
sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count()
//create table
df
.write
.partitionBy("partition")
.saveAsTable("db.table")
val x = (1 to 100).par
x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10))
//insert to different partitions in parallel
x.foreach { p =>
val df2 = df
.withColumn("partition",lit(p))
df2
.write
.mode(SaveMode.Overwrite)
.insertInto("db.table")
}
{code}
the error would be:
{code:java}
java.io.FileNotFoundException: File file:/path/to/warehouse/db.db/table/_temporary/0 does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
at company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190)
at scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}
A possible work around that I found is to set the config "mapreduce.job.application.attempt.id"
to a random integer in every job in SparkConf, and thus making each job write to a different path, but that won't work when there is a single spark context (since the config is the same)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org