You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mingyu Kim <mk...@palantir.com> on 2014/01/10 01:25:05 UTC
An open HDFS connection fails RDD.take()
Here¹s a snippet of code that throws exception. I create a FileSystem object
to an HDFS and tries to read a csv in the HDFS as RDD and do take().
> public static void main(String[] args) throws IOException {
> Configuration conf = new Configuration(false);
> conf.set("fs.default.name", "hdfs://localhost:8020");
> conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
> FileSystem fileSystem = FileSystem.get(conf);
> // fileSystem.close();
>
>
> JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077",
> ³MySpark", "/path/to/spark", new String[]{});
> JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
> System.out.println(rdd.take(300));
> }
>
It throws the following exception.
> Exception in thread "main" java.lang.IllegalStateException: Must not use
> direct buffers with InputStream API
>
> at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(Packet
> Receiver.java:211)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketRecei
> ver.java:134)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(
> PacketReceiver.java:102)
>
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.ja
> va:164)
>
> at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
>
> at
> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.
> java:559)
>
> at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
>
> at
>
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665>
)
>
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
>
> at java.io.DataInputStream.read(DataInputStream.java:100)
>
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>
> at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
>
> at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
>
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
>
> at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
>
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>
> at
> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.sc
> ala:484)
>
> at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)
However, if I comment back in ³fileSystem.close() in the original code,
take() finishes successfully.
This happens not only on my local machine. It also happens on EC2. Is this a
bug in Spark or am I using spark and HDFS in a wrong way?
Thanks,
Mingyu
Re: An open HDFS connection fails RDD.take()
Posted by Mingyu Kim <mk...@palantir.com>.
Apparently, I was creating the hadoop Configuration object with
³loadDefaults = false² and the problem went away when I initialize it with
³loadDefaults = true², which is how Spark is creating the Configuration
objects as well. I don¹t understand exactly why that¹s the case, though.
Mingyu
From: Matei Zaharia <ma...@gmail.com>
Reply-To: "user@spark.incubator.apache.org"
<us...@spark.incubator.apache.org>
Date: Tuesday, January 14, 2014 at 12:20 PM
To: "user@spark.incubator.apache.org" <us...@spark.incubator.apache.org>
Subject: Re: An open HDFS connection fails RDD.take()
Why are you creating the FileSystem object? You should be able to just pass
a full hdfs:// URL to textFile.
It might be that HDFS initialization is not thread-safe, or you can¹t have
two connections to the same filesystem somehow.
Matei
On Jan 14, 2014, at 11:54 AM, Mingyu Kim <mk...@palantir.com> wrote:
> Pinging again Does anyone have clue? Is this a bug on Spark?
>
> Mingyu
>
> From: Mingyu Kim <mk...@palantir.com>
> Reply-To: <us...@spark.incubator.apache.org>
> Date: Thursday, January 9, 2014 at 4:25 PM
> To: "user@spark.incubator.apache.org" <us...@spark.incubator.apache.org>
> Subject: An open HDFS connection fails RDD.take()
>
> Here¹s a snippet of code that throws exception. I create a FileSystem object
> to an HDFS and tries to read a csv in the HDFS as RDD and do take().
>
>> public static void main(String[] args) throws IOException {
>> Configuration conf = new Configuration(false);
>> conf.set("fs.default.name", "hdfs://localhost:8020");
>> conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
>> FileSystem fileSystem = FileSystem.get(conf);
>> // fileSystem.close();
>>
>>
>> JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077",
>> ³MySpark", "/path/to/spark", new String[]{});
>> JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
>> System.out.println(rdd.take(300));
>> }
>>
> It throws the following exception.
>
>> Exception in thread "main" java.lang.IllegalStateException: Must not use
>> direct buffers with InputStream API
>> at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(Packe
>> tReceiver.java:211)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketRece
>> iver.java:134)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket
>> (PacketReceiver.java:102)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.j
>> ava:164)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream
>> .java:559)
>> at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:66
>> 5)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
>> at java.io.DataInputStream.read(DataInputStream.java:100)
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
>> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
>> at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
>> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
>> at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
>> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>> at
>> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.s
>> cala:484)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)
>
> However, if I comment back in ³fileSystem.close() in the original code, take()
> finishes successfully.
>
> This happens not only on my local machine. It also happens on EC2. Is this a
> bug in Spark or am I using spark and HDFS in a wrong way?
>
> Thanks,
> Mingyu
Re: An open HDFS connection fails RDD.take()
Posted by Matei Zaharia <ma...@gmail.com>.
Why are you creating the FileSystem object? You should be able to just pass a full hdfs:// URL to textFile.
It might be that HDFS initialization is not thread-safe, or you can’t have two connections to the same filesystem somehow.
Matei
On Jan 14, 2014, at 11:54 AM, Mingyu Kim <mk...@palantir.com> wrote:
> Pinging again… Does anyone have clue? Is this a bug on Spark?
>
> Mingyu
>
> From: Mingyu Kim <mk...@palantir.com>
> Reply-To: <us...@spark.incubator.apache.org>
> Date: Thursday, January 9, 2014 at 4:25 PM
> To: "user@spark.incubator.apache.org" <us...@spark.incubator.apache.org>
> Subject: An open HDFS connection fails RDD.take()
>
> Here’s a snippet of code that throws exception. I create a FileSystem object to an HDFS and tries to read a csv in the HDFS as RDD and do take().
>
> public static void main(String[] args) throws IOException {
> Configuration conf = new Configuration(false);
> conf.set("fs.default.name", "hdfs://localhost:8020");
> conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
> FileSystem fileSystem = FileSystem.get(conf);
> // fileSystem.close();
>
>
> JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077", “MySpark", "/path/to/spark", new String[]{});
> JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
> System.out.println(rdd.take(300));
> }
>
> It throws the following exception.
>
> Exception in thread "main" java.lang.IllegalStateException: Must not use direct buffers with InputStream API
> at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
> at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)
> at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
> at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
> at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:164)
> at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
> at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:559)
> at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
> at java.io.DataInputStream.read(DataInputStream.java:100)
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
> at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
> at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
> at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:484)
> at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)
>
> However, if I comment back in “fileSystem.close() in the original code, take() finishes successfully.
>
> This happens not only on my local machine. It also happens on EC2. Is this a bug in Spark or am I using spark and HDFS in a wrong way?
>
> Thanks,
> Mingyu
Re: An open HDFS connection fails RDD.take()
Posted by Mingyu Kim <mk...@palantir.com>.
Pinging again Does anyone have clue? Is this a bug on Spark?
Mingyu
From: Mingyu Kim <mk...@palantir.com>
Reply-To: <us...@spark.incubator.apache.org>
Date: Thursday, January 9, 2014 at 4:25 PM
To: "user@spark.incubator.apache.org" <us...@spark.incubator.apache.org>
Subject: An open HDFS connection fails RDD.take()
Here¹s a snippet of code that throws exception. I create a FileSystem object
to an HDFS and tries to read a csv in the HDFS as RDD and do take().
> public static void main(String[] args) throws IOException {
> Configuration conf = new Configuration(false);
> conf.set("fs.default.name", "hdfs://localhost:8020");
> conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
> FileSystem fileSystem = FileSystem.get(conf);
> // fileSystem.close();
>
>
> JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077",
> ³MySpark", "/path/to/spark", new String[]{});
> JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
> System.out.println(rdd.take(300));
> }
>
It throws the following exception.
> Exception in thread "main" java.lang.IllegalStateException: Must not use
> direct buffers with InputStream API
>
> at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(Packet
> Receiver.java:211)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketRecei
> ver.java:134)
>
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(
> PacketReceiver.java:102)
>
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.ja
> va:164)
>
> at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
>
> at
> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.
> java:559)
>
> at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
>
> at
>
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665>
)
>
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
>
> at java.io.DataInputStream.read(DataInputStream.java:100)
>
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>
> at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
>
> at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
>
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
>
> at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
>
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
>
> at
> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.sc
> ala:484)
>
> at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)
However, if I comment back in ³fileSystem.close() in the original code,
take() finishes successfully.
This happens not only on my local machine. It also happens on EC2. Is this a
bug in Spark or am I using spark and HDFS in a wrong way?
Thanks,
Mingyu