You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:04:24 UTC

[jira] [Updated] (SPARK-22910) Wrong results in Spark Job because failed to move to Trash

     [ https://issues.apache.org/jira/browse/SPARK-22910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-22910:
---------------------------------
    Labels: bulk-closed  (was: )

> Wrong results in Spark Job because failed to move to Trash
> ----------------------------------------------------------
>
>                 Key: SPARK-22910
>                 URL: https://issues.apache.org/jira/browse/SPARK-22910
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Ohad Raviv
>            Priority: Major
>              Labels: bulk-closed
>
> Our Spark job has completed with status successful although the data save was corrupted.
> What happened is that we have a monthly job. each run overwrites the output of the previous run. we happened to change the sql.shuffle.partitions number between the runs from 2000 to 1000, and what happened was that the new run had Warn failure of moving the old data to the user's .Trash because it was full. because it was only a warning the process continued and overwritten the new 1000 files - while leaving most of the old remaining 1000 files in their place. this resulted that in the final output we had a folder with mix of old and new data and that caused corruption in the process.
> the post mortem is relatively easy to understand.
> {code}
> hadoop fs -ls /the/folder
> -rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20 /the/folder/part-00000.gz 
> .
> .
> -rwxr-xr-x 3 spark_user spark_user 111134899 2017-11-17 06:39 /the/folder/part-01990.gz 
> {code}
> and in the driver's log:
> {code}
> 17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed: java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
> java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
> 	at org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160)
> 	at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109)
> 	at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90)
> 	at org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272)
> 	at org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603)
> 	at org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586)
> 	at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851)
> 	at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272)
> 	at org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671)
> 	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741)
> 	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
> 	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
> 	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
> 	at org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739)
> 	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323)
> 	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
> 	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
> 	at org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:92)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
> 	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
> 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
> 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at 
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
> Caused by: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 3 TB but diskspace consumed = 3298551987606 B = 3.00 TB
> 	at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
> 	at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> 	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1855)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:575)
> 	at org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:154)
> 	... 70 more
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.DSQuotaExceededException): The DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 3 TB but diskspace consumed = 3298551987606 B = 3.00 TB
> 	at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
> 	at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1469)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1400)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> 	at com.sun.proxy.$Proxy9.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:468)
> 	at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy10.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1853)
> {code}
> it looks like the problem is with Spark's Hive version here:
> https://github.com/apache/hive/blob/spark2/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
> where the code actually says:
> {code}
>     //swallow the exception
>     LOG.warn("Directory " + oldPath.toString() + " canot be removed:" + StringUtils.stringifyException(e));
> {code}
> and that is also the same as Hive release-1.2.1, However, it seems like Hive solved it in Hive 2.0.0:
> https://issues.apache.org/jira/browse/HIVE-12505
> with this patch:
> https://github.com/apache/hive/commit/f9791bebaa9d71ff62cecedb0243d09e99abfb38
> So I guess I'm trying to add urgency to: SPARK-20202



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org