You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Romain Giot (JIRA)" <ji...@apache.org> on 2016/06/21 08:17:58 UTC

[jira] [Commented] (SPARK-16091) Dataset.partitionBy.csv raise a java.io.FileNotFoundException when launched on an hadoop cluster

    [ https://issues.apache.org/jira/browse/SPARK-16091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341351#comment-15341351 ] 

Romain Giot commented on SPARK-16091:
-------------------------------------

Ok, sorry for the too high importance of the bug.
Note that, on my configuration, the behaviour is exactly the same if I explicitly prepend the paths by hdfs://<server>:<port> 

> Dataset.partitionBy.csv raise a java.io.FileNotFoundException when launched on an hadoop cluster
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16091
>                 URL: https://issues.apache.org/jira/browse/SPARK-16091
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.0.0
>         Environment: Hadoop version: 2.5.1 
>            Reporter: Romain Giot
>            Priority: Minor
>
> When writing a Dataset in a CSV file, the following exception  java.io.FileNotFoundException is raised *after* the writing is done and successful.
> This behaviour does not happen when the spark application is launched locally ; it should be related to hdfs management.
> Here is a test code:
> {code}
> import org.apache.spark.SparkContext
> import org.apache.hadoop.fs.FileSystem
> import org.apache.hadoop.fs.{Path, PathFilter}
> import org.apache.spark.sql.SQLContext
> case class Test(A: String, B: String, C:String){
> }
> object WriteTest {
>   val sc: SparkContext = new SparkContext()
>   val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
>   val sqlContext: SQLContext = new SQLContext(sc)
>   
>   import sqlContext.implicits._
>   def main(args: Array[String]):Unit = {
>     val ds = Seq(
>       Test("abc", "abc", "abc"),
>       Test("abc", "abc", "def"), 
>       Test("abc", "abc", "ghi"), 
>       Test("abc", "xyz", "abc"),
>       Test("xyz", "xyz", "abc")
>     ).toDS()
>     // works
>     ds
>       .write
>       .option("header",true)
>       .mode("overwrite")
>       .csv("/tmp/test1.csv")
>     // fails
>     ds
>       .write
>       .option("header",true)
>       .mode("overwrite")
>       .partitionBy("A", "B")
>       .csv("/tmp/test2.csv")
>   }
> }
> {code}
> and here is the exception stack:
> {code}
> java.io.FileNotFoundException: Path is not a file: /tmp/test2.csv/A=abc
> 	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68)
> 	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> 	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> 	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> 	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1222)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210)
> 	at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1260)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:220)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:216)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:216)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:208)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69)
> 	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.<init>(ListingFileCatalog.scala:50)
> 	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
> 	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:424)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234)
> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:697)
> 	at WriteTest$.main(write.scala:42)
> 	at WriteTest.main(write.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:483)
> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:610)
> {code}
> And here is the result of hdfs dfs -ls /tmp/test2.csv:
> {code}
> Found 3 items
> drwxr-xr-x   - hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/A=abc
> drwxr-xr-x   - hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/A=xyz
> -rw-r--r--   3 hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/_SUCCESS
> {code}
> I have no idea if the bug comes from the hdfs implementation or the way spark uses it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org