You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lev Katzav (JIRA)" <ji...@apache.org> on 2019/03/02 10:42:00 UTC

[jira] [Created] (SPARK-27030) DataFrameWriter.insertInto fails when writing in parallel to a hive table

Lev Katzav created SPARK-27030:
----------------------------------

             Summary: DataFrameWriter.insertInto fails when writing in parallel to a hive table
                 Key: SPARK-27030
                 URL: https://issues.apache.org/jira/browse/SPARK-27030
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: Lev Katzav


When writing to a hive table, the following temp directory is used:
{code:java}
/path/to/table/_temporary/0/{code}
(the 0 at the end comes from the config
{code:java}
"mapreduce.job.application.attempt.id"{code}
since that config is missing, it falls back to 0)

when there are 2 processes that write to the same table, there could be the following race condition:
 # p1 creates temp folder and uses it
 # p2 uses temp folder
 # p1 finishes and deletes temp folder
 # p2 fails since temp folder is missing

 

It is possible to recreate this error locally with the following code:
(the code runs locally, but I experienced the same error when running on a cluster
with 2 jobs writing to the same table)
{code:java}
import org.apache.spark.sql.functions._
val df = spark
 .range(1000)
 .toDF("a")
 .withColumn("partition", lit(0))
 .cache()
//create db
sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count()

//create table
df
 .write
 .partitionBy("partition")
 .saveAsTable("db.table")
val x = (1 to 100).par
x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10))


//insert to different partitions in parallel
x.foreach { p =>

 val df2 = df
 .withColumn("partition",lit(p))
  df2
   .write
   .mode(SaveMode.Overwrite)
   .insertInto("db.table")
}
{code}
 

 the error would be:
{code:java}
java.io.FileNotFoundException: File file:/path/to/warehouse/db.db/table/_temporary/0 does not exist
 at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
 at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283)
 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325)
 at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
 at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
 at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
 at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
 at company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190)
 at scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91)
 at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
 at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
 at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
 at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
 at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
 at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
 at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
 at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
 at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
 at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
 at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}
 

A possible work around that I found is to set the config "mapreduce.job.application.attempt.id"
to a random integer in every job in SparkConf, and thus making each job write to a different path, but that won't work when there is a single spark context (since the config is the same)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org