You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shuai Zheng <sz...@gmail.com> on 2015/03/13 23:51:00 UTC

Spark will process _temporary folder on S3 is very slow and always cause failure

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it
as a single node cluster for test. The data I use to sort is around 4GB and
sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the
script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile =
sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3
n://...,s3n://...,")

parquetFile.registerTempTable("Test")

val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map {
row => { row.mkString("\t") } }

sortedResult.saveAsTextFile("s3n://myplace,");

 

The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
<console>:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the
ready result from _temporary folder to the final location. This process
might be quick locally (because it will just be a cut/paste), but it looks
like very slow on my S3, it takes a few second to move one file (usually
there will be 200 partitions). And then it raise exceptions after it move
might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

        at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar
ser.java:101)

        at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja
va:252)

        at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst
ractHttpClientConnection.java:281)

        at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa
ultClientConnection.java:247)

        at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab
stractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job
has finished and all result on the S3 under temporary folder, but then it
raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don't know what is the issue here, I never see MapReduce has similar
issue. So it could not be S3's problem.

 

Regards,

 

Shuai


Re: Need Advice about reading lots of text files

Posted by Pat Ferrel <pa...@occamsmachete.com>.
There are no-doubt many things that feed into the right way to read a lot of files into Spark. But why force users to learn all of those factors instead of putting an optimizer layer into the read inside Spark?

BTW I realize your method is not one task per file, it’s chunked and done in parallel. Looks good for text and I may use it—but what about sequence files or json SchemaRDD/DataFrame reading? These will all have the same issue and are also likely to be in very many small files given the increasing popularity of Spark Streaming. It also seems like an optimizer would work in a very similar way for these.

+1 for read optimizer :-)


On Mar 17, 2015, at 10:31 AM, Michael Armbrust <mi...@databricks.com> wrote:

I agree that it would be better if Spark did a better job automatically here, though doing so is probably a non-trivial amount of work.  My code is certainly worse if you have only a few very large text files for example and thus I'd generally encourage people to try the built in options first.

However, one of the nice things about Spark I think is the flexibility that it gives you. So, when you are trying to read 100,000s of tiny files this works pretty well.  I'll also comment that this does not create a task per file and that is another reason its faster for the many small files case.  Of course that comes at the expense of locality (which doesn't matter for my use case on S3 anyway)...

On Tue, Mar 17, 2015 at 8:16 AM, Imran Rashid <irashid@cloudera.com <ma...@cloudera.com>> wrote:
Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking.

I think this encourages Pat's argument that we might actually need better support for this in spark context itself?

On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust <michael@databricks.com <ma...@databricks.com>> wrote:

Here is how I have dealt with many small text files (on s3 though this should generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E <http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E>


 
From	Michael Armbrust <mich...@databricks.com <ma...@databricks.com>>
Subject	Re: S3NativeFileSystem inefficient implementation when calling sc.textFile
Date	Thu, 27 Nov 2014 03:20:14 GMT
In the past I have worked around this problem by avoiding sc.textFile().
Instead I read the data directly inside of a Spark job.  Basically, you
start with an RDD where each entry is a file in S3 and then flatMap that
with something that reads the files and returns the lines.

Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe <https://gist.github.com/marmbrus/fff0b058f134fa7752fe>

Using this class you can do something like:

sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
Nil).flatMap(new ReadLinesSafe(_))

You can also build up the list of files by running a Spark job:
https://gist.github.com/marmbrus/15e72f7bc22337cf6653 <https://gist.github.com/marmbrus/15e72f7bc22337cf6653>

Michael

On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
It’s a long story but there are many dirs with smallish part-xxxx files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part-xxxx files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed?

I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers <koert@tresata.com <ma...@tresata.com>> wrote:

why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:

We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 10000) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>









Re: Need Advice about reading lots of text files

Posted by Michael Armbrust <mi...@databricks.com>.
I agree that it would be better if Spark did a better job automatically
here, though doing so is probably a non-trivial amount of work.  My code is
certainly worse if you have only a few very large text files for example
and thus I'd generally encourage people to try the built in options first.

However, one of the nice things about Spark I think is the flexibility that
it gives you. So, when you are trying to read 100,000s of tiny files this
works pretty well.  I'll also comment that this does not create a task per
file and that is another reason its faster for the many small files case.
Of course that comes at the expense of locality (which doesn't matter for
my use case on S3 anyway)...

On Tue, Mar 17, 2015 at 8:16 AM, Imran Rashid <ir...@cloudera.com> wrote:

> Interesting, on another thread, I was just arguing that the user should
> *not* open the files themselves and read them, b/c then they lose all the
> other goodies we have in HadoopRDD, eg. the metric tracking.
>
> I think this encourages Pat's argument that we might actually need better
> support for this in spark context itself?
>
> On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>>
>> Here is how I have dealt with many small text files (on s3 though this
>> should generalize) in the past:
>>
>> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E
>>
>>
>>
>>
>>> FromMichael Armbrust <mi...@databricks.com>SubjectRe:
>>> S3NativeFileSystem inefficient implementation when calling sc.textFile
>>> DateThu, 27 Nov 2014 03:20:14 GMT
>>>
>>> In the past I have worked around this problem by avoiding sc.textFile().
>>> Instead I read the data directly inside of a Spark job.  Basically, you
>>> start with an RDD where each entry is a file in S3 and then flatMap that
>>> with something that reads the files and returns the lines.
>>>
>>> Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe
>>>
>>> Using this class you can do something like:
>>>
>>> sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
>>> Nil).flatMap(new ReadLinesSafe(_))
>>>
>>> You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653
>>>
>>> Michael
>>>
>>>
>>> On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pa...@occamsmachete.com>
>>> wrote:
>>>
>>>> It’s a long story but there are many dirs with smallish part-xxxx files
>>>> in them so we create a list of the individual files as input
>>>> to sparkContext.textFile(fileList). I suppose we could move them and rename
>>>> them to be contiguous part-xxxx files in one dir. Would that be better than
>>>> passing in a long list of individual filenames? We could also make the part
>>>> files much larger by collecting the smaller ones. But would any of this
>>>> make a difference in IO speed?
>>>>
>>>> I ask because using the long file list seems to read, what amounts to a
>>>> not very large data set rather slowly. If it were all in large part files
>>>> in one dir I’d expect it to go much faster but this is just intuition.
>>>>
>>>>
>>>> On Mar 14, 2015, at 9:58 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>>>
>>>> why can you not put them in a directory and read them as one input? you
>>>> will get a task per file, but spark is very fast at executing many tasks
>>>> (its not a jvm per task).
>>>>
>>>> On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pa...@occamsmachete.com>
>>>> wrote:
>>>>
>>>>> Any advice on dealing with a large number of separate input files?
>>>>>
>>>>>
>>>>> On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>>>>>
>>>>> We have many text files that we need to read in parallel. We can
>>>>> create a comma delimited list of files to pass in to
>>>>> sparkContext.textFile(fileList). The list can get very large (maybe 10000)
>>>>> and is all on hdfs.
>>>>>
>>>>> The question is: what is the most performant way to read them? Should
>>>>> they be broken up and read in groups appending the resulting RDDs or should
>>>>> we just pass in the entire list at once? In effect I’m asking if Spark does
>>>>> some optimization of whether we should do it explicitly. If the later, what
>>>>> rule might we use depending on our cluster setup?
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Need Advice about reading lots of text files

Posted by Imran Rashid <ir...@cloudera.com>.
Interesting, on another thread, I was just arguing that the user should
*not* open the files themselves and read them, b/c then they lose all the
other goodies we have in HadoopRDD, eg. the metric tracking.

I think this encourages Pat's argument that we might actually need better
support for this in spark context itself?

On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust <mi...@databricks.com>
wrote:

>
> Here is how I have dealt with many small text files (on s3 though this
> should generalize) in the past:
>
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E
>
>
>
>
>> FromMichael Armbrust <mi...@databricks.com>SubjectRe:
>> S3NativeFileSystem inefficient implementation when calling sc.textFile
>> DateThu, 27 Nov 2014 03:20:14 GMT
>>
>> In the past I have worked around this problem by avoiding sc.textFile().
>> Instead I read the data directly inside of a Spark job.  Basically, you
>> start with an RDD where each entry is a file in S3 and then flatMap that
>> with something that reads the files and returns the lines.
>>
>> Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe
>>
>> Using this class you can do something like:
>>
>> sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
>> Nil).flatMap(new ReadLinesSafe(_))
>>
>> You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653
>>
>> Michael
>>
>>
>> On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pa...@occamsmachete.com>
>> wrote:
>>
>>> It’s a long story but there are many dirs with smallish part-xxxx files
>>> in them so we create a list of the individual files as input
>>> to sparkContext.textFile(fileList). I suppose we could move them and rename
>>> them to be contiguous part-xxxx files in one dir. Would that be better than
>>> passing in a long list of individual filenames? We could also make the part
>>> files much larger by collecting the smaller ones. But would any of this
>>> make a difference in IO speed?
>>>
>>> I ask because using the long file list seems to read, what amounts to a
>>> not very large data set rather slowly. If it were all in large part files
>>> in one dir I’d expect it to go much faster but this is just intuition.
>>>
>>>
>>> On Mar 14, 2015, at 9:58 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>> why can you not put them in a directory and read them as one input? you
>>> will get a task per file, but spark is very fast at executing many tasks
>>> (its not a jvm per task).
>>>
>>> On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pa...@occamsmachete.com>
>>> wrote:
>>>
>>>> Any advice on dealing with a large number of separate input files?
>>>>
>>>>
>>>> On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>>>>
>>>> We have many text files that we need to read in parallel. We can create
>>>> a comma delimited list of files to pass in to
>>>> sparkContext.textFile(fileList). The list can get very large (maybe 10000)
>>>> and is all on hdfs.
>>>>
>>>> The question is: what is the most performant way to read them? Should
>>>> they be broken up and read in groups appending the resulting RDDs or should
>>>> we just pass in the entire list at once? In effect I’m asking if Spark does
>>>> some optimization of whether we should do it explicitly. If the later, what
>>>> rule might we use depending on our cluster setup?
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>>
>>
>

Re: Need Advice about reading lots of text files

Posted by madhu phatak <ph...@gmail.com>.
Hi,
Internally Spark uses HDFS api to handle file data. Have a look at HAR,
Sequence file input format. More information on this cloudera blog
<http://blog.cloudera.com/blog/2009/02/the-small-files-problem/>.

Regards,
Madhukara Phatak
http://datamantra.io/

On Sun, Mar 15, 2015 at 9:59 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> Ah most interesting—thanks.
>
> So it seems sc.textFile(longFileList) has to read all metadata before
> starting the read for partitioning purposes so what you do is not use it?
>
> You create a task per file that reads one file (in parallel) per task
> without scanning for _all_ metadata. Can’t argue with the logic but perhaps
> Spark should incorporate something like this in sc.textFile? My case can’t
> be that unusual especially since I am periodically processing micro-batches
> from Spark Streaming. In fact Actually I have to scan HDFS to create the
> longFileList to begin with so get file status and therefore probably all
> the metadata needed by sc.textFile. Your method would save one scan, which
> is good.
>
> Might a better sc.textFile take a beginning URI, a file pattern regex, and
> a recursive flag? Then one scan could create all metadata automatically for
> a large subset of people using the function, something like
>
>     sc.textFile(beginDir: String, filePattern: String = “^part.*”,
> recursive: Boolean = false)
>
> I fact it should be easy to create BetterSC that overrides the textFile
> method with a re-implementation that only requires one scan to get
> metadata.
>
> Just thinking on email…
>
> On Mar 14, 2015, at 11:11 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>
> Here is how I have dealt with many small text files (on s3 though this
> should generalize) in the past:
>
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E
>
>
>
>
>> FromMichael Armbrust <mi...@databricks.com>SubjectRe:
>> S3NativeFileSystem inefficient implementation when calling sc.textFile
>> DateThu, 27 Nov 2014 03:20:14 GMT
>>
>> In the past I have worked around this problem by avoiding sc.textFile().
>> Instead I read the data directly inside of a Spark job.  Basically, you
>> start with an RDD where each entry is a file in S3 and then flatMap that
>> with something that reads the files and returns the lines.
>>
>> Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe
>>
>> Using this class you can do something like:
>>
>> sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
>> Nil).flatMap(new ReadLinesSafe(_))
>>
>> You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653
>>
>> Michael
>>
>>
>> On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pa...@occamsmachete.com>
>> wrote:
>>
>>> It’s a long story but there are many dirs with smallish part-xxxx files
>>> in them so we create a list of the individual files as input
>>> to sparkContext.textFile(fileList). I suppose we could move them and rename
>>> them to be contiguous part-xxxx files in one dir. Would that be better than
>>> passing in a long list of individual filenames? We could also make the part
>>> files much larger by collecting the smaller ones. But would any of this
>>> make a difference in IO speed?
>>>
>>> I ask because using the long file list seems to read, what amounts to a
>>> not very large data set rather slowly. If it were all in large part files
>>> in one dir I’d expect it to go much faster but this is just intuition.
>>>
>>>
>>> On Mar 14, 2015, at 9:58 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>> why can you not put them in a directory and read them as one input? you
>>> will get a task per file, but spark is very fast at executing many tasks
>>> (its not a jvm per task).
>>>
>>> On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pa...@occamsmachete.com>
>>> wrote:
>>>
>>>> Any advice on dealing with a large number of separate input files?
>>>>
>>>>
>>>> On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>>>>
>>>> We have many text files that we need to read in parallel. We can create
>>>> a comma delimited list of files to pass in to
>>>> sparkContext.textFile(fileList). The list can get very large (maybe 10000)
>>>> and is all on hdfs.
>>>>
>>>> The question is: what is the most performant way to read them? Should
>>>> they be broken up and read in groups appending the resulting RDDs or should
>>>> we just pass in the entire list at once? In effect I’m asking if Spark does
>>>> some optimization of whether we should do it explicitly. If the later, what
>>>> rule might we use depending on our cluster setup?
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>>
>>
>
>

Re: Need Advice about reading lots of text files

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Ah most interesting—thanks.

So it seems sc.textFile(longFileList) has to read all metadata before starting the read for partitioning purposes so what you do is not use it? 

You create a task per file that reads one file (in parallel) per task without scanning for _all_ metadata. Can’t argue with the logic but perhaps Spark should incorporate something like this in sc.textFile? My case can’t be that unusual especially since I am periodically processing micro-batches from Spark Streaming. In fact Actually I have to scan HDFS to create the longFileList to begin with so get file status and therefore probably all the metadata needed by sc.textFile. Your method would save one scan, which is good.

Might a better sc.textFile take a beginning URI, a file pattern regex, and a recursive flag? Then one scan could create all metadata automatically for a large subset of people using the function, something like 

    sc.textFile(beginDir: String, filePattern: String = “^part.*”, recursive: Boolean = false)

I fact it should be easy to create BetterSC that overrides the textFile method with a re-implementation that only requires one scan to get metadata. 

Just thinking on email…

On Mar 14, 2015, at 11:11 AM, Michael Armbrust <mi...@databricks.com> wrote:


Here is how I have dealt with many small text files (on s3 though this should generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E <http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E>


 
From	Michael Armbrust <mich...@databricks.com <ma...@databricks.com>>
Subject	Re: S3NativeFileSystem inefficient implementation when calling sc.textFile
Date	Thu, 27 Nov 2014 03:20:14 GMT
In the past I have worked around this problem by avoiding sc.textFile().
Instead I read the data directly inside of a Spark job.  Basically, you
start with an RDD where each entry is a file in S3 and then flatMap that
with something that reads the files and returns the lines.

Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe <https://gist.github.com/marmbrus/fff0b058f134fa7752fe>

Using this class you can do something like:

sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
Nil).flatMap(new ReadLinesSafe(_))

You can also build up the list of files by running a Spark job:
https://gist.github.com/marmbrus/15e72f7bc22337cf6653 <https://gist.github.com/marmbrus/15e72f7bc22337cf6653>

Michael

On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
It’s a long story but there are many dirs with smallish part-xxxx files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part-xxxx files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed?

I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers <koert@tresata.com <ma...@tresata.com>> wrote:

why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:

We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 10000) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>







Re: Need Advice about reading lots of text files

Posted by Michael Armbrust <mi...@databricks.com>.
Here is how I have dealt with many small text files (on s3 though this
should generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3CCAAswR-58p66-Es2HaXH4i+bU__0RVxD2oKEWKLy0MEE8RuexEw@mail.gmail.com%3E




> FromMichael Armbrust <mi...@databricks.com>SubjectRe:
> S3NativeFileSystem inefficient implementation when calling sc.textFileDateThu,
> 27 Nov 2014 03:20:14 GMT
>
> In the past I have worked around this problem by avoiding sc.textFile().
> Instead I read the data directly inside of a Spark job.  Basically, you
> start with an RDD where each entry is a file in S3 and then flatMap that
> with something that reads the files and returns the lines.
>
> Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe
>
> Using this class you can do something like:
>
> sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
> Nil).flatMap(new ReadLinesSafe(_))
>
> You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653
>
> Michael
>
>
> On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <pa...@occamsmachete.com>
> wrote:
>
>> It’s a long story but there are many dirs with smallish part-xxxx files
>> in them so we create a list of the individual files as input
>> to sparkContext.textFile(fileList). I suppose we could move them and rename
>> them to be contiguous part-xxxx files in one dir. Would that be better than
>> passing in a long list of individual filenames? We could also make the part
>> files much larger by collecting the smaller ones. But would any of this
>> make a difference in IO speed?
>>
>> I ask because using the long file list seems to read, what amounts to a
>> not very large data set rather slowly. If it were all in large part files
>> in one dir I’d expect it to go much faster but this is just intuition.
>>
>>
>> On Mar 14, 2015, at 9:58 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>> why can you not put them in a directory and read them as one input? you
>> will get a task per file, but spark is very fast at executing many tasks
>> (its not a jvm per task).
>>
>> On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pa...@occamsmachete.com>
>> wrote:
>>
>>> Any advice on dealing with a large number of separate input files?
>>>
>>>
>>> On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>>>
>>> We have many text files that we need to read in parallel. We can create
>>> a comma delimited list of files to pass in to
>>> sparkContext.textFile(fileList). The list can get very large (maybe 10000)
>>> and is all on hdfs.
>>>
>>> The question is: what is the most performant way to read them? Should
>>> they be broken up and read in groups appending the resulting RDDs or should
>>> we just pass in the entire list at once? In effect I’m asking if Spark does
>>> some optimization of whether we should do it explicitly. If the later, what
>>> rule might we use depending on our cluster setup?
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>>
>

Re: Need Advice about reading lots of text files

Posted by Pat Ferrel <pa...@occamsmachete.com>.
It’s a long story but there are many dirs with smallish part-xxxx files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part-xxxx files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed?

I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers <ko...@tresata.com> wrote:

why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:

We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 10000) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>




Re: Need Advice about reading lots of text files

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 10000) and is all on hdfs. 

The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Need Advice about reading lots of text files

Posted by Pat Ferrel <pa...@occamsmachete.com>.
We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 10000) and is all on hdfs. 

The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark will process _temporary folder on S3 is very slow and always cause failure

Posted by Shuai Zheng <sz...@gmail.com>.
Thanks!

 

Let me update the status.

 

I have copied the DirectOutputCommitter to my local. And set:

 

Conf.set("spark.hadoop.mapred.output.committer.class", "org.****.DirectOutputCommitter")

 

It works perfectly.

 

Thanks  everyone J

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilikerps@gmail.com] 
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Spark will process _temporary folder on S3 is very slow and always cause failure

 

Actually, this is the more relevant JIRA (which is resolved):

https://issues.apache.org/jira/browse/SPARK-3595

 

6352 is about saveAsParquetFile, which is not in use here.

 

Here is a DirectOutputCommitter implementation:

https://gist.github.com/aarondav/c513916e72101bbe14ec

 

and it can be configured in Spark with:

sparkConf.set("spark.hadoop.mapred.output.committer.class", classOf[DirectOutputCommitter].getName)

 

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid <ir...@cloudera.com> wrote:

I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with "object" stores, that don't have a simple move operation.  There have been a few other threads on S3 & outputcommitters.  I think the most relevant for you is most probably this open JIRA:

 

https://issues.apache.org/jira/browse/SPARK-6352

 

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng <sz...@gmail.com> wrote:

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile = sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,")

parquetFile.registerTempTable("Test")

val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map { row => { row.mkString("\t") } }

sortedResult.saveAsTextFile("s3n://myplace,");

 

The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at <console>:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

        at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

        at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

        at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

        at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem.

 

Regards,

 

Shuai

 

 


Re: Spark will process _temporary folder on S3 is very slow and always cause failure

Posted by Aaron Davidson <il...@gmail.com>.
Actually, this is the more relevant JIRA (which is resolved):
https://issues.apache.org/jira/browse/SPARK-3595

6352 is about saveAsParquetFile, which is not in use here.

Here is a DirectOutputCommitter implementation:
https://gist.github.com/aarondav/c513916e72101bbe14ec

and it can be configured in Spark with:
sparkConf.set("spark.hadoop.mapred.output.committer.class",
classOf[DirectOutputCommitter].getName)

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid <ir...@cloudera.com> wrote:

> I'm not super familiar w/ S3, but I think the issue is that you want to
> use a different output committers with "object" stores, that don't have a
> simple move operation.  There have been a few other threads on S3 &
> outputcommitters.  I think the most relevant for you is most probably this
> open JIRA:
>
> https://issues.apache.org/jira/browse/SPARK-6352
>
> On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng <sz...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>
>>
>> I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
>> it as a single node cluster for test. The data I use to sort is around 4GB
>> and sit on S3, output will also on S3.
>>
>>
>>
>> I just connect spark-shell to the local cluster and run the code in the
>> script (because I just want a benchmark now).
>>
>>
>>
>> My job is as simple as:
>>
>> val parquetFile =
>> sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,")
>>
>> parquetFile.registerTempTable("Test")
>>
>> val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map
>> { row => { row.mkString("\t") } }
>>
>> sortedResult.saveAsTextFile("s3n://myplace,");
>>
>>
>>
>> The job takes around 6 mins to finish the sort when I am monitoring the
>> process. After I notice the process stop at:
>>
>>
>>
>> 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
>> <console>:31, took 581.304992 s
>>
>>
>>
>> At that time, the spark actually just write all the data to the
>> _temporary folder first, after all sub-tasks finished, it will try to move
>> all the ready result from _temporary folder to the final location. This
>> process might be quick locally (because it will just be a cut/paste), but
>> it looks like very slow on my S3, it takes a few second to move one file
>> (usually there will be 200 partitions). And then it raise exceptions after
>> it move might be 40-50 files.
>>
>>
>>
>> org.apache.http.NoHttpResponseException: The target server failed to
>> respond
>>
>>         at
>> org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)
>>
>>         at
>> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)
>>
>>         at
>> org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)
>>
>>         at
>> org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)
>>
>>         at
>> org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
>>
>>
>>
>>
>>
>> I try several times, but never get the full job finished. I am not sure
>> anything wrong here, but I use something very basic and I can see the job
>> has finished and all result on the S3 under temporary folder, but then it
>> raise the exception and fail.
>>
>>
>>
>> Any special setting I should do here when deal with S3?
>>
>>
>>
>> I don’t know what is the issue here, I never see MapReduce has similar
>> issue. So it could not be S3’s problem.
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>
>

Re: Spark will process _temporary folder on S3 is very slow and always cause failure

Posted by Imran Rashid <ir...@cloudera.com>.
I'm not super familiar w/ S3, but I think the issue is that you want to use
a different output committers with "object" stores, that don't have a
simple move operation.  There have been a few other threads on S3 &
outputcommitters.  I think the most relevant for you is most probably this
open JIRA:

https://issues.apache.org/jira/browse/SPARK-6352

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng <sz...@gmail.com> wrote:

> Hi All,
>
>
>
> I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
> it as a single node cluster for test. The data I use to sort is around 4GB
> and sit on S3, output will also on S3.
>
>
>
> I just connect spark-shell to the local cluster and run the code in the
> script (because I just want a benchmark now).
>
>
>
> My job is as simple as:
>
> val parquetFile =
> sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,")
>
> parquetFile.registerTempTable("Test")
>
> val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map
> { row => { row.mkString("\t") } }
>
> sortedResult.saveAsTextFile("s3n://myplace,");
>
>
>
> The job takes around 6 mins to finish the sort when I am monitoring the
> process. After I notice the process stop at:
>
>
>
> 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
> <console>:31, took 581.304992 s
>
>
>
> At that time, the spark actually just write all the data to the _temporary
> folder first, after all sub-tasks finished, it will try to move all the
> ready result from _temporary folder to the final location. This process
> might be quick locally (because it will just be a cut/paste), but it looks
> like very slow on my S3, it takes a few second to move one file (usually
> there will be 200 partitions). And then it raise exceptions after it move
> might be 40-50 files.
>
>
>
> org.apache.http.NoHttpResponseException: The target server failed to
> respond
>
>         at
> org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)
>
>         at
> org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)
>
>         at
> org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)
>
>         at
> org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)
>
>         at
> org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
>
>
>
>
>
> I try several times, but never get the full job finished. I am not sure
> anything wrong here, but I use something very basic and I can see the job
> has finished and all result on the S3 under temporary folder, but then it
> raise the exception and fail.
>
>
>
> Any special setting I should do here when deal with S3?
>
>
>
> I don’t know what is the issue here, I never see MapReduce has similar
> issue. So it could not be S3’s problem.
>
>
>
> Regards,
>
>
>
> Shuai
>