You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ajith shetty <aj...@huawei.com> on 2018/01/08 06:31:01 UTC
Spark SQL : Exception on concurrent insert due to lease over
_SUCCESS
Hi all
I am using spark 2.1 and I encounter exception when do concurrent insert on a table, Here is my scenario and some analysis
create table sample using csv options('path' '/tmp/f/')
When concurrent insert are executed, we see exception like below:
2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder DFSClient_NONMAPREDUCE_8638078_1 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254)
at org.apache.hadoop.ipc.Client.call(Client.java:1524)
at org.apache.hadoop.ipc.Client.call(Client.java:1460)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy14.complete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy15.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167)
at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)
at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
at $line48.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
at $line48.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)
at $line48.$read$$iw$$iw$$iw.<init>(<console>:37)
at $line48.$read$$iw$$iw.<init>(<console>:39)
at $line48.$read$$iw.<init>(<console>:41)
at $line48.$read.<init>(<console>:43)
at $line48.$read$.<init>(<console>:47)
at $line48.$read$.<clinit>(<console>)
at $line48.$eval$.$print$lzycompute(<console>:7)
at $line48.$eval$.$print(<console>:6)
at $line48.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:69)
at org.apache.spark.repl.Main$.main(Main.scala:52)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:761)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:179)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)
... 48 elided
Basic analysis:
_SUCCESS file is used by map reduce framework to mark successful jobs (mapreduce.fileoutputcommitter.marksuccessfuljobs / DEFAULTS true). This is done by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitJob via org.apache.spark.sql.execution.datasources.FileFormatWriter#write
if (context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
Path markerPath = new Path(this.outputPath, "_SUCCESS");
fs.create(markerPath).close();
}
The _SUCCESS is created by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter depending on the output path specified via mapreduce.output.fileoutputformat.outputdir in job configuration
Lets take example of
create table sample using csv options('path' '/tmp/f/')
so the data files created on every insert is present in /tmp/f/
mapreduce.output.fileoutputformat.outputdir is passed by spark @ org.apache.spark.sql.execution.datasources.FileFormatWriter#write is used by committer for 2 reason
1. create part files with data under this folder ( first in _temporary folder and then move to output folder on commitJob)
2. create _SUCCESS files on job completion
If 2 applications try to insert to same table concurrently, on job completion when try to commit, the _SUCCESS will result in a race condition (in our example /tmp/f/_SUCCESS close() call failed). HDFS can lease to only one HDFS client hence failing other
As mentioned, the _SUCCESS is created by MapReduce code, this could be turned off by setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false @ org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob by Spark
_SUCCESS is only used by frameworks like OOZIE for file processing, (Refer https://books.google.co.in/books?id=HAY_CQAAQBAJ&pg=PA119&lpg=PA119&dq=Oozie+dependencies+on+_SUCCESS+file&source=bl&ots=RTr3hP0Cjj&sig=3B2yk24ebZt42SQo8O42eOX6OCI&hl=en&sa=X&ved=0ahUKEwjj053i5cDYAhVIr48KHUIGCZQQ6AEIYjAI#v=onepage&q=Oozie%20dependencies%20on%20_SUCCESS%20file&f=false)
so setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob should be ok.? Will it have any impact as I do not see _SUCCESS being used by spark. I am new to spark so please correct me if any of the analysis is wrong :)
Regards
Ajith