You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ramkumar Chokkalingam <ra...@gmail.com> on 2013/10/07 05:51:14 UTC

Output to a single directory with multiple files rather multiple directories ?

Hello,

I have started experimenting with Spark Cluster. I have a parallelization
job where I want to parse through several folders and each of them has
multiple files,which I parse and do some file processing on the files'
records and write the whole file back to a output file. I do the same
processing operation(Hashing certain fields in the data file) for all the
files inside the Folder. Simply,

*For a directory D, *
*  Read all files inside D. *
*    For each File F
*
*      Loop: For each line L in File, I do some processing and write my
processing output to a file. *

So if there are 200 files inside input directory - I would like to have 200
files in my output directory. I learnt that with *SaveAsTextFile(Name) *API
spark creates a directory with the name we specify (Name) and creates the
actual output files inside that folder in the form of part-00000,part-00001
etc.. files ( similar to Hadoop, I assumed).
My question is there a way where we specify the name of the output
directory and *redirect all my SaveAsTextFile(DirName) outputs into a
single folder* rather ?

Let me know if there is a way of achieving this. If not, I would appreciate
hearing some workarounds. Thanks!


Regards,

Ramkumar Chokkalingam,
Masters Student, University of Washington || 206-747-3515
 <http://www.linkedin.com/in/mynameisram>

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Ramkumar Chokkalingam <ra...@gmail.com>.
Yes, now I got it. Thanks Mark. Will try this and update!

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
If your input files are already in HDFS, then parallelizing the parsing or
other transformation of their contents in Spark should be easy -- that's
just the way the system works.  So you should end up with something like:

val inputFilenames: List[String] = ...whatever you need to do to generate a
list of your filenames as hdfs: URIs
inputFilenames.foreach { filename =>
  val lines =  sc.textFile(filename)
  lines.map(line => parsingFunction(line)).
    saveAsHadoopFile(...)
    // or saveAsNewApiHadoopFile; either way, I'm making the assumption
that your
    // parsingFunction has resulted in an RDD[(K, V)] for some K and V
}

The parsingFunction will automatically be applied in parallel across the
splits of your HDFS files.  If that isn't generating enough parallelism for
you, then you can use the optional minSplits parameter of textFile() to
produce more splits: e.g., sc.textFile(filename, 16).




On Fri, Oct 11, 2013 at 12:17 PM, Ramkumar Chokkalingam <
ramkumar.au@gmail.com> wrote:

> Thanks for the recommendation,Mark.
>
> I have Setup Hadoop and was using the HDFS to run my MR jobs, hence I
> assume it wouldn't take much of time to start using them from Spark code.I
> can write scripts to move them to HDFS before running my spark code.
> Since, You suggested I don't need to call parallelize() on any object,
> should I go with the following approach,
>
> *Reading input from HDFS as a file each,*
> * output = Parse the file *
> *Writing the output to a HFS file using HADOOP API*
> * Repeat the process for all input files*
>
> Should this be the pipeline I must be following, given that my input files
> are ~4MB each, and I process(parse) a file each Where/How does the
> parallelization (of my parsing )happens ?
>
>
>

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Ramkumar Chokkalingam <ra...@gmail.com>.
Thanks for the recommendation,Mark.

I have Setup Hadoop and was using the HDFS to run my MR jobs, hence I
assume it wouldn't take much of time to start using them from Spark code.I
can write scripts to move them to HDFS before running my spark code.
Since, You suggested I don't need to call parallelize() on any object,
should I go with the following approach,

*Reading input from HDFS as a file each,*
* output = Parse the file *
*Writing the output to a HFS file using HADOOP API*
* Repeat the process for all input files*

Should this be the pipeline I must be following, given that my input files
are ~4MB each, and I process(parse) a file each Where/How does the
parallelization (of my parsing )happens ?

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
As a general rule, data in HDFS is more useful than data in NFS, and data
in NFS is more useful than data in local files; so I'd recommend that you
investigate how to get your data into the distributed filesystem early so
that you can work with it in parallel using Spark or other tools that work
with HDFS.  Using Spark to push data into HDFS is possible, but not optimal
-- it will soon become a bottleneck for large datasets.  Moving data across
the network is expensive, so it is worth taking the design time and even
writing custom scripts or code to minimize such transfers and not
necessarily trying to do everything from within Spark.


On Fri, Oct 11, 2013 at 10:59 AM, Ramkumar Chokkalingam <
ramkumar.au@gmail.com> wrote:

> Thanks Mark, for the response.
>
> I have my input on the server as local files. we haven't thought if we
> might set-up a NFS server. We have configured the server machine -
> installed Hadoop and have HFS setup. To achieve my goal, What is the change
> that you would recommend over the pipeline I suggested ?
>
>

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Ramkumar Chokkalingam <ra...@gmail.com>.
Thanks Mark, for the response.

I have my input on the server as local files. we haven't thought if we
might set-up a NFS server. We have configured the server machine -
installed Hadoop and have HFS setup. To achieve my goal, What is the change
that you would recommend over the pipeline I suggested ?

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
That won't work.  First, parallelize is a SparkContext method called on
collections present in your driver process, not an RDD method.  An RDD is
already a parallel collection, so there is no need to parallelize it.
 Second, where do your input files reside?  It makes a big difference
whether they are just regular files local to your driver, or in a network
filesystem accessible to the worker nodes, or already in a
Hadoop-compatible distributed filesystem like HDFS.


On Thu, Oct 10, 2013 at 10:10 PM, Ramkumar Chokkalingam <
ramkumar.au@gmail.com> wrote:

> Thanks both for you time. To make it clear before I start off -
>
> From my input folder,
> Read all the filenames into a Spark RDD, say InputFilesRDD
> Call InputFilesRDD.parallelize() on that collection [which would split my
> input data filenames among various clusters]
> outputRDD = InputFilesRDD.foreach(filename => {Read the file [from local
> disk ?] and parse})
> write the output(outputRDD) to Hadoop DFS using Hadoop API.
>
> So, in this pipeline -> my input will be in my local disk[read from] and
> only while writing , I write[output] to Hadoop FileSystem as multiple files
> ?
>
> I find some Hadoop API's under JavaSparkContext<http://spark.incubator.apache.org/docs/0.6.1/api/core/spark/api/java/JavaSparkContext.html> and
> a dedicated Hadoop API NewHadoopRDD<http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.NewHadoopRDD> .
> Is this what you are were referring to ?
>
>
>
>
>

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Ramkumar Chokkalingam <ra...@gmail.com>.
Thanks both for you time. To make it clear before I start off -

>From my input folder,
Read all the filenames into a Spark RDD, say InputFilesRDD
Call InputFilesRDD.parallelize() on that collection [which would split my
input data filenames among various clusters]
outputRDD = InputFilesRDD.foreach(filename => {Read the file [from local
disk ?] and parse})
write the output(outputRDD) to Hadoop DFS using Hadoop API.

So, in this pipeline -> my input will be in my local disk[read from] and
only while writing , I write[output] to Hadoop FileSystem as multiple files
?

I find some Hadoop API's under
JavaSparkContext<http://spark.incubator.apache.org/docs/0.6.1/api/core/spark/api/java/JavaSparkContext.html>
and
a dedicated Hadoop API
NewHadoopRDD<http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.NewHadoopRDD>
.
Is this what you are were referring to ?

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Matei Zaharia <ma...@gmail.com>.
Yeah, Christopher answered this before I could, but you can list the directory in the driver nodes, find out all the filenames, and then use SparkContext.parallelize() on an array of filenames to split the set of filenames among tasks. After that, run a foreach() on the parallelized RDD and have your tasks read the input file and write the corresponding output file using the HDFS API.

Matei

On Oct 10, 2013, at 5:50 PM, Christopher Nguyen <ct...@adatao.com> wrote:

> Ramkumar, it sounds like you can consider a file-parallel approach rather than a strict data-parallel parsing of the problem. In other words, separate the file copying task from the file parsing task. Have the driver program D handle the directory scan, which then parallelizes the file list into N slaves S[1 .. N]. The file contents themselves can be either passed from driver D to slaves S as (a) a serialized data structure, (b) copied by the driver D into HDFS, or (c) copied via other distributed filesystem such as NFS. When the slave processing is complete, it writes the result back out to HDFS, which is then picked up by D and copied to your desired output directory structure.
> 
> This is admittedly a bit of file copying back and forth over the network, but if your input structure is some file system, and output structure is the same, then you'd incur that cost at some point anyway. And if the file parsing is much more expensive than file transfer, then you do get significant speed gains in parallelizing the parsing task.
> 
> It's also quite conducive to getting to code complete in a hour or less. KISS.
> 
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao
> linkedin.com/in/ctnguyen
> 
> 
> 
> On Thu, Oct 10, 2013 at 4:30 PM, Ramkumar Chokkalingam <ra...@gmail.com> wrote:
> Hey, 
> 
> Thanks for the mail, Matei. Since, I need to have the output  directory structure to be same as the input directory structure with some changes made to the content of those files while parsing [ replacing certain fields with its encrypted value]. I wouldn't want the union to combine few of the input files into a single file. 
> 
> Is there some API which would treat each file as independent and write to a output file ? That would've been great. 
> 
> If it doesn't work, then I have to write them each to a folder and process each of them (using some script) to match my input directory structure. 
> 
> 
>  
> 


Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Christopher Nguyen <ct...@adatao.com>.
Ramkumar, it sounds like you can consider a file-parallel approach rather
than a strict data-parallel parsing of the problem. In other words,
separate the file copying task from the file parsing task. Have the driver
program D handle the directory scan, which then parallelizes the file list
into N slaves S[1 .. N]. The file contents themselves can be either passed
from driver D to slaves S as (a) a serialized data structure, (b) copied by
the driver D into HDFS, or (c) copied via other distributed filesystem such
as NFS. When the slave processing is complete, it writes the result back
out to HDFS, which is then picked up by D and copied to your desired output
directory structure.

This is admittedly a bit of file copying back and forth over the network,
but if your input structure is some file system, and output structure is
the same, then you'd incur that cost at some point anyway. And if the file
parsing is much more expensive than file transfer, then you do get
significant speed gains in parallelizing the parsing task.

It's also quite conducive to getting to code complete in a hour or less.
KISS.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Thu, Oct 10, 2013 at 4:30 PM, Ramkumar Chokkalingam <
ramkumar.au@gmail.com> wrote:

> Hey,
>
> Thanks for the mail, Matei. Since, I need to have the output  directory
> structure to be same as the input directory structure with some changes
> made to the content of those files while parsing [ replacing certain fields
> with its encrypted value]. I wouldn't want the union to combine few of the
> input files into a single file.
>
> Is there some API which would treat each file as independent and write to
> a output file ? That would've been great.
>
> If it doesn't work, then I have to write them each to a folder and process
> each of them (using some script) to match my input directory structure.
>
>
>
>

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Ramkumar Chokkalingam <ra...@gmail.com>.
Hey,

Thanks for the mail, Matei. Since, I need to have the output  directory
structure to be same as the input directory structure with some changes
made to the content of those files while parsing [ replacing certain fields
with its encrypted value]. I wouldn't want the union to combine few of the
input files into a single file.

Is there some API which would treat each file as independent and write to a
output file ? That would've been great.

If it doesn't work, then I have to write them each to a folder and process
each of them (using some script) to match my input directory structure.

Re: Output to a single directory with multiple files rather multiple directories ?

Posted by Matei Zaharia <ma...@gmail.com>.
Hey, sorry, for this question, there's a similar answer to the previous one. You'll have to move the files from the output directories into a common directory by hand, possibly renaming them. The Hadoop InputFormat and OutputFormat APIs that we use are just designed to work at the level of directories (one directory represents one dataset).

One other option may be to build a union of multiple RDDs, using SparkContext.union(rdd1, rdd2, etc), and then call saveAsTextFile on that. Now they'll all be written to the same output location.

Matei

On Oct 6, 2013, at 8:51 PM, Ramkumar Chokkalingam <ra...@gmail.com> wrote:

> 
> Hello, 
> 
> I have started experimenting with Spark Cluster. I have a parallelization job where I want to parse through several folders and each of them has multiple files,which I parse and do some file processing on the files' records and write the whole file back to a output file. I do the same processing operation(Hashing certain fields in the data file) for all the files inside the Folder. Simply, 
> 
> For a directory D, 
>   Read all files inside D. 
>     For each File F
>       Loop: For each line L in File, I do some processing and write my processing output to a file. 
> 
> So if there are 200 files inside input directory - I would like to have 200 files in my output directory. I learnt that with SaveAsTextFile(Name) API spark creates a directory with the name we specify (Name) and creates the actual output files inside that folder in the form of part-00000,part-00001 etc.. files ( similar to Hadoop, I assumed). 
> My question is there a way where we specify the name of the output directory and redirect all my SaveAsTextFile(DirName) outputs into a single folder rather ?
> 
> Let me know if there is a way of achieving this. If not, I would appreciate hearing some workarounds. Thanks!
> 
> 
> Regards,
> 
> Ramkumar Chokkalingam, 
> Masters Student, University of Washington || 206-747-3515
> 
>  
>