You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ruben <ru...@pandora.be> on 2016/11/08 17:11:45 UTC

use case reading files split per id

Hey,

We have files organized on hdfs in this manner:

base_folder
|- <ID1>
|------------- file1
|------------- file2
|------------- ...
|- <ID2>
|------------- file1
|------------- file2
|------------- ...
| - ...

We want to be able to do the following operation on our data:

- for each ID we want to parse the lines into records (timestamp, record
data), giving us a list[(timestamp, record_data)] for each ID
- then we want to do an operation on list[(timestamp, record_data)], giving
us a list[output], note that this operation is not a simple map operation
(timestamp, record_data) -> output, but it requires to know the full list of
records for an id

Currently we are doing this in the following way:

val ids: List[String] = <insert list of all our ids>
val idsWithPaths: List[(String, List[String])] = <append path strings for
ids>
sc.parallelize(idsWithPaths, partitions)
.map{ case (id, pathList) =>
  val sourceList: List[Source] = <convert pathList to sources>
  val combinedIterator: Iterator = sourceList.map(_.getLines()).reduceLeft(_
++ _)
  val records:List[(Timestamp, RecordData)] = parseRecords(combinedIterator,
id)
  val output: List[Output] = generateOutput(records, id)
}

I would like to know if this is a good way to do this operation. It seems to
me that it doesn't make full use of the capabilities of spark (data locality
for example, since there is no way for the partitioner to know how to
distribute the ids close to the files on hdfs). Some attempts where made to
translate this using sc.textfile and sc.wholetextfiles but by doing some
small benchmarks it seemed that those were slower (but it could be due to
the specific implementation, since it required some groupByKey/reduceByKey
steps to gather the data for each ID into a list[(timestamp, record_data)]
to be able to do the generateOutput function).



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: use case reading files split per id

Posted by ayan guha <gu...@gmail.com>.
How about following approach -

- get the list of ID
- get one rdd each for them using wholetextfile
- map and flatmap to generate pair rdd with ID as key and list as value
- union all the RDD s together
- group by key
On 15 Nov 2016 16:43, "Mo Tao" <my...@qq.com> wrote:

> Hi ruben,
>
> You may try sc.binaryFiles which is designed for lots of small files and it
> can map paths into inputstreams.
> Each inputstream will keep only the path and some configuration, so it
> would
> be cheap to shuffle them.
> However, I'm not sure whether spark take the data locality into account
> while dealing with these inputstreams.
>
> Hope this helps
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/use-case-reading-files-split-per-
> id-tp28044p28075.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: use case reading files split per id

Posted by ruben <ru...@pandora.be>.
Yes that binary files function looks interesting, thanks for the tip.

Some followup questions:

- I always wonder when people are talking about 'small' files and 'large'
files. Is there any rule of thumb when these things apply? Are small files
those which can fit completely in memory on the node and large files do not?

- If it works similarly to wholeTextFiles it will give me tuples like this:
(/base/id1/file1, contentA)
(/base/id1/file2, contentB)
...
(/base/id2/file1, contentC)
(/base/id2/file2, contentD)
...

since I want to end up with tuples like:
(id1, parsedContentA ++ parsedContentB ++ ...)
(id2, parsedContentC ++ parsedContentD ++ ...)

would reduceByKey be the best function to accomplish this?
will using dataFrames give me any benefits here?
This will end up with some shuffling of parsedContent's which are
List[(Timestamp, RecordData)] right? but I guess this is not really
something which can be avoided.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044p28086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: use case reading files split per id

Posted by Mo Tao <my...@qq.com>.
Hi ruben,

You may try sc.binaryFiles which is designed for lots of small files and it
can map paths into inputstreams.
Each inputstream will keep only the path and some configuration, so it would
be cheap to shuffle them.
However, I'm not sure whether spark take the data locality into account
while dealing with these inputstreams.

Hope this helps



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044p28075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org