You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ajith shetty <aj...@huawei.com> on 2018/01/08 06:31:01 UTC

Spark SQL : Exception on concurrent insert due to lease over _SUCCESS

Hi all

I am using spark 2.1 and I encounter exception when do concurrent insert on a table, Here is my scenario and some analysis

create table sample using csv options('path' '/tmp/f/')

When concurrent insert are executed, we see exception like below:

2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder DFSClient_NONMAPREDUCE_8638078_1 does not have any open files.
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254)

        at org.apache.hadoop.ipc.Client.call(Client.java:1524)
        at org.apache.hadoop.ipc.Client.call(Client.java:1460)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy14.complete(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
        at com.sun.proxy.$Proxy15.complete(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887)
        at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861)
        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167)
        at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)
        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
        at $line48.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
        at $line48.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)
        at $line48.$read$$iw$$iw$$iw.<init>(<console>:37)
        at $line48.$read$$iw$$iw.<init>(<console>:39)
        at $line48.$read$$iw.<init>(<console>:41)
        at $line48.$read.<init>(<console>:43)
        at $line48.$read$.<init>(<console>:47)
        at $line48.$read$.<clinit>(<console>)
        at $line48.$eval$.$print$lzycompute(<console>:7)
        at $line48.$eval$.$print(<console>:6)
        at $line48.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:761)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:179)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)
  ... 48 elided


Basic analysis:

_SUCCESS file is used by map reduce framework to mark successful jobs (mapreduce.fileoutputcommitter.marksuccessfuljobs / DEFAULTS true). This is done by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitJob via org.apache.spark.sql.execution.datasources.FileFormatWriter#write

            if (context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
                Path markerPath = new Path(this.outputPath, "_SUCCESS");
                fs.create(markerPath).close();
            }


The _SUCCESS is created by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter depending on the output path specified via mapreduce.output.fileoutputformat.outputdir in job configuration

Lets take example of
create table sample using csv options('path' '/tmp/f/')

so the data files created on every insert is present in /tmp/f/

mapreduce.output.fileoutputformat.outputdir is passed by spark @ org.apache.spark.sql.execution.datasources.FileFormatWriter#write is used by committer for 2 reason
1. create part files with data under this folder ( first in _temporary folder and then move to output folder on commitJob)
2. create _SUCCESS files on job completion

If 2 applications try to insert to same table concurrently, on job completion when try to commit, the _SUCCESS will result in a race condition  (in our example /tmp/f/_SUCCESS close() call failed). HDFS can lease to only one HDFS client hence failing other

As mentioned, the _SUCCESS is created by MapReduce code, this could be turned off by setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false @ org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob by Spark

_SUCCESS is only used by frameworks like OOZIE for file processing, (Refer https://books.google.co.in/books?id=HAY_CQAAQBAJ&pg=PA119&lpg=PA119&dq=Oozie+dependencies+on+_SUCCESS+file&source=bl&ots=RTr3hP0Cjj&sig=3B2yk24ebZt42SQo8O42eOX6OCI&hl=en&sa=X&ved=0ahUKEwjj053i5cDYAhVIr48KHUIGCZQQ6AEIYjAI#v=onepage&q=Oozie%20dependencies%20on%20_SUCCESS%20file&f=false)

so  setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob should be ok.?  Will it have any impact as I do not see _SUCCESS being used by spark. I am new to spark so please correct me if any of the analysis is wrong :)

Regards
Ajith