You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/06/21 12:40:58 UTC
[jira] [Resolved] (SPARK-16091) Dataset.partitionBy.csv raise a
java.io.FileNotFoundException when launched on an hadoop cluster
[ https://issues.apache.org/jira/browse/SPARK-16091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-16091.
-------------------------------
Resolution: Duplicate
> Dataset.partitionBy.csv raise a java.io.FileNotFoundException when launched on an hadoop cluster
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-16091
> URL: https://issues.apache.org/jira/browse/SPARK-16091
> Project: Spark
> Issue Type: Bug
> Components: Input/Output
> Affects Versions: 2.0.0
> Environment: Hadoop version: 2.5.1
> Reporter: Romain Giot
> Priority: Minor
>
> When writing a Dataset in a CSV file, the following exception java.io.FileNotFoundException is raised *after* the writing is done and successful.
> This behaviour does not happen when the spark application is launched locally ; it should be related to hdfs management.
> Here is a test code:
> {code}
> import org.apache.spark.SparkContext
> import org.apache.hadoop.fs.FileSystem
> import org.apache.hadoop.fs.{Path, PathFilter}
> import org.apache.spark.sql.SQLContext
> case class Test(A: String, B: String, C:String){
> }
> object WriteTest {
> val sc: SparkContext = new SparkContext()
> val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
> val sqlContext: SQLContext = new SQLContext(sc)
>
> import sqlContext.implicits._
> def main(args: Array[String]):Unit = {
> val ds = Seq(
> Test("abc", "abc", "abc"),
> Test("abc", "abc", "def"),
> Test("abc", "abc", "ghi"),
> Test("abc", "xyz", "abc"),
> Test("xyz", "xyz", "abc")
> ).toDS()
> // works
> ds
> .write
> .option("header",true)
> .mode("overwrite")
> .csv("/tmp/test1.csv")
> // fails
> ds
> .write
> .option("header",true)
> .mode("overwrite")
> .partitionBy("A", "B")
> .csv("/tmp/test2.csv")
> }
> }
> {code}
> and here is the exception stack:
> {code}
> java.io.FileNotFoundException: Path is not a file: /tmp/test2.csv/A=abc
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68)
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1222)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210)
> at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1260)
> at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:220)
> at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:216)
> at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:216)
> at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:208)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69)
> at org.apache.spark.sql.execution.datasources.ListingFileCatalog.<init>(ListingFileCatalog.scala:50)
> at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
> at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:424)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234)
> at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:697)
> at WriteTest$.main(write.scala:42)
> at WriteTest.main(write.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:610)
> {code}
> And here is the result of hdfs dfs -ls /tmp/test2.csv:
> {code}
> Found 3 items
> drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/A=abc
> drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/A=xyz
> -rw-r--r-- 3 hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/_SUCCESS
> {code}
> I have no idea if the bug comes from the hdfs implementation or the way spark uses it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org