You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Alexander Pivovarov <ap...@gmail.com> on 2016/05/15 03:13:44 UTC

combitedTextFile and CombineTextInputFormat

Hello Everyone

Do you think it would be useful to add combinedTextFile method (which uses
CombineTextInputFormat) to SparkContext?

It allows one task to read data from multiple text files and control number
of RDD partitions by setting
mapreduce.input.fileinputformat.split.maxsize


  def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
    val conf = sc.hadoopConfiguration
    sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
classOf[LongWritable], classOf[Text], conf).
      map(pair => pair._2.toString).setName(path)
  }


Alex

Re: combitedTextFile and CombineTextInputFormat

Posted by Xiangrui Meng <me...@gmail.com>.
Not exacly the same as the one you suggested but you can chain it with
flatMap to get what you want, if each file is not huge.

On Thu, May 19, 2016, 8:41 AM Xiangrui Meng <me...@gmail.com> wrote:

> This was implemented as sc.wholeTextFiles.
>
> On Thu, May 19, 2016, 2:43 AM Reynold Xin <rx...@databricks.com> wrote:
>
>> Users would be able to run this already with the 3 lines of code you
>> supplied right? In general there are a lot of methods already on
>> SparkContext and we lean towards the more conservative side in introducing
>> new API variants.
>>
>> Note that this is something we are doing automatically in Spark SQL for
>> file sources (Dataset/DataFrame).
>>
>>
>> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <
>> apivovarov@gmail.com> wrote:
>>
>>> Hello Everyone
>>>
>>> Do you think it would be useful to add combinedTextFile method (which
>>> uses CombineTextInputFormat) to SparkContext?
>>>
>>> It allows one task to read data from multiple text files and control
>>> number of RDD partitions by setting
>>> mapreduce.input.fileinputformat.split.maxsize
>>>
>>>
>>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>>     val conf = sc.hadoopConfiguration
>>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>>> classOf[LongWritable], classOf[Text], conf).
>>>       map(pair => pair._2.toString).setName(path)
>>>   }
>>>
>>>
>>> Alex
>>>
>>
>>

Re: combitedTextFile and CombineTextInputFormat

Posted by Reynold Xin <rx...@databricks.com>.
It is different isn't it. Whole text files returns one element per file,
whereas combined inout format is similar to coalescing partitions to bin
pack into a certain size.

On Thursday, May 19, 2016, Xiangrui Meng <me...@gmail.com> wrote:

> This was implemented as sc.wholeTextFiles.
>
> On Thu, May 19, 2016, 2:43 AM Reynold Xin <rxin@databricks.com
> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>
>> Users would be able to run this already with the 3 lines of code you
>> supplied right? In general there are a lot of methods already on
>> SparkContext and we lean towards the more conservative side in introducing
>> new API variants.
>>
>> Note that this is something we are doing automatically in Spark SQL for
>> file sources (Dataset/DataFrame).
>>
>>
>> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <
>> apivovarov@gmail.com
>> <javascript:_e(%7B%7D,'cvml','apivovarov@gmail.com');>> wrote:
>>
>>> Hello Everyone
>>>
>>> Do you think it would be useful to add combinedTextFile method (which
>>> uses CombineTextInputFormat) to SparkContext?
>>>
>>> It allows one task to read data from multiple text files and control
>>> number of RDD partitions by setting
>>> mapreduce.input.fileinputformat.split.maxsize
>>>
>>>
>>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>>     val conf = sc.hadoopConfiguration
>>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>>> classOf[LongWritable], classOf[Text], conf).
>>>       map(pair => pair._2.toString).setName(path)
>>>   }
>>>
>>>
>>> Alex
>>>
>>
>>

Re: combitedTextFile and CombineTextInputFormat

Posted by Xiangrui Meng <me...@gmail.com>.
This was implemented as sc.wholeTextFiles.

On Thu, May 19, 2016, 2:43 AM Reynold Xin <rx...@databricks.com> wrote:

> Users would be able to run this already with the 3 lines of code you
> supplied right? In general there are a lot of methods already on
> SparkContext and we lean towards the more conservative side in introducing
> new API variants.
>
> Note that this is something we are doing automatically in Spark SQL for
> file sources (Dataset/DataFrame).
>
>
> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <apivovarov@gmail.com
> > wrote:
>
>> Hello Everyone
>>
>> Do you think it would be useful to add combinedTextFile method (which
>> uses CombineTextInputFormat) to SparkContext?
>>
>> It allows one task to read data from multiple text files and control
>> number of RDD partitions by setting
>> mapreduce.input.fileinputformat.split.maxsize
>>
>>
>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>     val conf = sc.hadoopConfiguration
>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>> classOf[LongWritable], classOf[Text], conf).
>>       map(pair => pair._2.toString).setName(path)
>>   }
>>
>>
>> Alex
>>
>
>

Re: combitedTextFile and CombineTextInputFormat

Posted by Saisai Shao <sa...@gmail.com>.
Hi Alex,

From my understanding the community is shifting the effort from RDD based
APIs to Dataset/DataFrame based ones, so for me it is not so necessary to
add a new RDD based API as I mentioned before. Also for the problem of so
many partitions, I think there're many other solutions to handle it.

Of course it is just my own thought.

Thanks
Saisai

On Fri, May 20, 2016 at 1:15 PM, Alexander Pivovarov <ap...@gmail.com>
wrote:

> Saisai, Reynold,
>
> Thank you for your replies.
> I also think that many variation of textFile() methods might be confusing
> for users. Better to have just one good textFile() implementation.
>
> Do you think sc.textFile() should use CombineTextInputFormat instead
> of TextInputFormat?
>
> CombineTextInputFormat allows users to control number of partitions in
> RDD (control split size)
> It's useful for real workloads (e.g. 100 folders, 200,000 files, all files
> are different size, e.g. 100KB - 500MB, total 4TB)
>
> if we use current implementation of sc.textFile() it will generate RDD
> with 250,000+ partitions (one partition for each small file, several
> partitions for big files).
>
> Using CombineTextInputFormat allows us to control number of partitions and
> split size by settign mapreduce.input.fileinputformat.split.maxsize
> property. e.g. if we set it to 256MB spark will generate RDD with ~20,000
> partitions.
>
> It's better to have RDD with 20,000 partitions by 256MB than RDD with
> 250,000+ partition all different sizes from 100KB to 128MB
>
> So, I see only advantages if sc.textFile() starts using CombineTextInputFormat
> instead of TextInputFormat
>
> Alex
>
> On Thu, May 19, 2016 at 8:30 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> From my understanding I think newAPIHadoopFile or hadoopFIle is generic
>> enough for you to support any InputFormat you wanted. IMO it is not so
>> necessary to add a new API for this.
>>
>> On Fri, May 20, 2016 at 12:59 AM, Alexander Pivovarov <
>> apivovarov@gmail.com> wrote:
>>
>>> Spark users might not know about CombineTextInputFormat. They probably
>>> think that sc.textFile already implements the best way to read text files.
>>>
>>> I think CombineTextInputFormat can replace regular TextInputFormat in
>>> most of the cases.
>>> Maybe Spark 2.0 can use CombineTextInputFormat in sc.textFile ?
>>> On May 19, 2016 2:43 AM, "Reynold Xin" <rx...@databricks.com> wrote:
>>>
>>>> Users would be able to run this already with the 3 lines of code you
>>>> supplied right? In general there are a lot of methods already on
>>>> SparkContext and we lean towards the more conservative side in introducing
>>>> new API variants.
>>>>
>>>> Note that this is something we are doing automatically in Spark SQL for
>>>> file sources (Dataset/DataFrame).
>>>>
>>>>
>>>> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <
>>>> apivovarov@gmail.com> wrote:
>>>>
>>>>> Hello Everyone
>>>>>
>>>>> Do you think it would be useful to add combinedTextFile method (which
>>>>> uses CombineTextInputFormat) to SparkContext?
>>>>>
>>>>> It allows one task to read data from multiple text files and control
>>>>> number of RDD partitions by setting
>>>>> mapreduce.input.fileinputformat.split.maxsize
>>>>>
>>>>>
>>>>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>>>>     val conf = sc.hadoopConfiguration
>>>>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>>>>> classOf[LongWritable], classOf[Text], conf).
>>>>>       map(pair => pair._2.toString).setName(path)
>>>>>   }
>>>>>
>>>>>
>>>>> Alex
>>>>>
>>>>
>>>>
>>
>

Re: combitedTextFile and CombineTextInputFormat

Posted by Alexander Pivovarov <ap...@gmail.com>.
Saisai, Reynold,

Thank you for your replies.
I also think that many variation of textFile() methods might be confusing
for users. Better to have just one good textFile() implementation.

Do you think sc.textFile() should use CombineTextInputFormat instead
of TextInputFormat?

CombineTextInputFormat allows users to control number of partitions in RDD
(control split size)
It's useful for real workloads (e.g. 100 folders, 200,000 files, all files
are different size, e.g. 100KB - 500MB, total 4TB)

if we use current implementation of sc.textFile() it will generate RDD with
250,000+ partitions (one partition for each small file, several partitions
for big files).

Using CombineTextInputFormat allows us to control number of partitions and
split size by settign mapreduce.input.fileinputformat.split.maxsize
property. e.g. if we set it to 256MB spark will generate RDD with ~20,000
partitions.

It's better to have RDD with 20,000 partitions by 256MB than RDD with
250,000+ partition all different sizes from 100KB to 128MB

So, I see only advantages if sc.textFile() starts using CombineTextInputFormat
instead of TextInputFormat

Alex

On Thu, May 19, 2016 at 8:30 PM, Saisai Shao <sa...@gmail.com> wrote:

> From my understanding I think newAPIHadoopFile or hadoopFIle is generic
> enough for you to support any InputFormat you wanted. IMO it is not so
> necessary to add a new API for this.
>
> On Fri, May 20, 2016 at 12:59 AM, Alexander Pivovarov <
> apivovarov@gmail.com> wrote:
>
>> Spark users might not know about CombineTextInputFormat. They probably
>> think that sc.textFile already implements the best way to read text files.
>>
>> I think CombineTextInputFormat can replace regular TextInputFormat in
>> most of the cases.
>> Maybe Spark 2.0 can use CombineTextInputFormat in sc.textFile ?
>> On May 19, 2016 2:43 AM, "Reynold Xin" <rx...@databricks.com> wrote:
>>
>>> Users would be able to run this already with the 3 lines of code you
>>> supplied right? In general there are a lot of methods already on
>>> SparkContext and we lean towards the more conservative side in introducing
>>> new API variants.
>>>
>>> Note that this is something we are doing automatically in Spark SQL for
>>> file sources (Dataset/DataFrame).
>>>
>>>
>>> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <
>>> apivovarov@gmail.com> wrote:
>>>
>>>> Hello Everyone
>>>>
>>>> Do you think it would be useful to add combinedTextFile method (which
>>>> uses CombineTextInputFormat) to SparkContext?
>>>>
>>>> It allows one task to read data from multiple text files and control
>>>> number of RDD partitions by setting
>>>> mapreduce.input.fileinputformat.split.maxsize
>>>>
>>>>
>>>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>>>     val conf = sc.hadoopConfiguration
>>>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>>>> classOf[LongWritable], classOf[Text], conf).
>>>>       map(pair => pair._2.toString).setName(path)
>>>>   }
>>>>
>>>>
>>>> Alex
>>>>
>>>
>>>
>

Re: combitedTextFile and CombineTextInputFormat

Posted by Saisai Shao <sa...@gmail.com>.
From my understanding I think newAPIHadoopFile or hadoopFIle is generic
enough for you to support any InputFormat you wanted. IMO it is not so
necessary to add a new API for this.

On Fri, May 20, 2016 at 12:59 AM, Alexander Pivovarov <ap...@gmail.com>
wrote:

> Spark users might not know about CombineTextInputFormat. They probably
> think that sc.textFile already implements the best way to read text files.
>
> I think CombineTextInputFormat can replace regular TextInputFormat in most
> of the cases.
> Maybe Spark 2.0 can use CombineTextInputFormat in sc.textFile ?
> On May 19, 2016 2:43 AM, "Reynold Xin" <rx...@databricks.com> wrote:
>
>> Users would be able to run this already with the 3 lines of code you
>> supplied right? In general there are a lot of methods already on
>> SparkContext and we lean towards the more conservative side in introducing
>> new API variants.
>>
>> Note that this is something we are doing automatically in Spark SQL for
>> file sources (Dataset/DataFrame).
>>
>>
>> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <
>> apivovarov@gmail.com> wrote:
>>
>>> Hello Everyone
>>>
>>> Do you think it would be useful to add combinedTextFile method (which
>>> uses CombineTextInputFormat) to SparkContext?
>>>
>>> It allows one task to read data from multiple text files and control
>>> number of RDD partitions by setting
>>> mapreduce.input.fileinputformat.split.maxsize
>>>
>>>
>>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>>     val conf = sc.hadoopConfiguration
>>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>>> classOf[LongWritable], classOf[Text], conf).
>>>       map(pair => pair._2.toString).setName(path)
>>>   }
>>>
>>>
>>> Alex
>>>
>>
>>

Re: combitedTextFile and CombineTextInputFormat

Posted by Alexander Pivovarov <ap...@gmail.com>.
Spark users might not know about CombineTextInputFormat. They probably
think that sc.textFile already implements the best way to read text files.

I think CombineTextInputFormat can replace regular TextInputFormat in most
of the cases.
Maybe Spark 2.0 can use CombineTextInputFormat in sc.textFile ?
On May 19, 2016 2:43 AM, "Reynold Xin" <rx...@databricks.com> wrote:

> Users would be able to run this already with the 3 lines of code you
> supplied right? In general there are a lot of methods already on
> SparkContext and we lean towards the more conservative side in introducing
> new API variants.
>
> Note that this is something we are doing automatically in Spark SQL for
> file sources (Dataset/DataFrame).
>
>
> On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <apivovarov@gmail.com
> > wrote:
>
>> Hello Everyone
>>
>> Do you think it would be useful to add combinedTextFile method (which
>> uses CombineTextInputFormat) to SparkContext?
>>
>> It allows one task to read data from multiple text files and control
>> number of RDD partitions by setting
>> mapreduce.input.fileinputformat.split.maxsize
>>
>>
>>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>>     val conf = sc.hadoopConfiguration
>>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
>> classOf[LongWritable], classOf[Text], conf).
>>       map(pair => pair._2.toString).setName(path)
>>   }
>>
>>
>> Alex
>>
>
>

Re: combitedTextFile and CombineTextInputFormat

Posted by Reynold Xin <rx...@databricks.com>.
Users would be able to run this already with the 3 lines of code you
supplied right? In general there are a lot of methods already on
SparkContext and we lean towards the more conservative side in introducing
new API variants.

Note that this is something we are doing automatically in Spark SQL for
file sources (Dataset/DataFrame).


On Sat, May 14, 2016 at 8:13 PM, Alexander Pivovarov <ap...@gmail.com>
wrote:

> Hello Everyone
>
> Do you think it would be useful to add combinedTextFile method (which uses
> CombineTextInputFormat) to SparkContext?
>
> It allows one task to read data from multiple text files and control
> number of RDD partitions by setting
> mapreduce.input.fileinputformat.split.maxsize
>
>
>   def combinedTextFile(sc: SparkContext)(path: String): RDD[String] = {
>     val conf = sc.hadoopConfiguration
>     sc.newAPIHadoopFile(path, classOf[CombineTextInputFormat],
> classOf[LongWritable], classOf[Text], conf).
>       map(pair => pair._2.toString).setName(path)
>   }
>
>
> Alex
>