You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Patel <ar...@gmail.com> on 2016/10/06 11:26:49 UTC

Best approach for processing all files parallelly

My Pyspark program is currently identifies the list of the files from a
directory (Using Python Popen command taking hadoop fs -ls arguments).  For
each file, a Dataframe is created and processed. This is sequeatial. How to
process all files paralelly?  Please note that every file in the directory
has different schema.  So, depending on the file name, different logic is
used for each file. So, I cannot really create one Dataframe for all these
files and iterate each row.  Using wholeTextFiles seems to be good approach
for me.  But, I am not sure how to create DataFrame from this.  For
example, Is there a way to do this way do something like below.

def createDFProcess(myrdd):
    df = sqlCtx.read.json(myrdd)
    df.show()

whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)

Above code does not work.  I get an error 'TypeError: 'JavaPackage' object
is not callable'.  How to make it work?

Or is there a better approach?

-Arun

Re: Best approach for processing all files parallelly

Posted by ayan guha <gu...@gmail.com>.
Hi

Sorry for confusion, but I meant those functions to be written by you.
Those are you r business logic or etl logic
On 10 Oct 2016 21:06, "Arun Patel" <ar...@gmail.com> wrote:

> Ayan, which version of Python are you using? I am using 2.6.9 and I don't
> find generateFileType and getSchemaFor functions.  Thanks for your help.
>
> On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> generateFileType (filename) returns FileType
>>
>> getSchemaFor(FileType) returns schema for FileType
>>
>> This for loop DOES NOT process files sequentially. It creates dataframes
>> on all files which are of same types sequentially.
>>
>> On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <ar...@gmail.com>
>> wrote:
>>
>>> Thanks Ayan.  Couple of questions:
>>>
>>> 1) How does generateFileType and getSchemaFor functions look like?
>>> 2) 'For loop' is processing files sequentially, right? my requirement is
>>> to process all files at same time.
>>>
>>> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <gu...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> In this case, if you see, t[1] is NOT the file content, as I have added
>>>> a "FileType" field. So, this collect is just bringing in the list of file
>>>> types, should be fine
>>>>
>>>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Ayan.  I am really concerned about the collect.
>>>>>
>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>
>>>>> This will ship all files on to the driver, right?  It must be
>>>>> inefficient.
>>>>>
>>>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think you are correct direction. What is missing is: You do not
>>>>>> need to create DF for each file. You can scramble files with similar
>>>>>> structures together (by doing some filter on file name) and then create a
>>>>>> DF for each type of file. Also, creating DF on wholeTextFile seems wasteful
>>>>>> to me. I would probably do it like this
>>>>>>
>>>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>>>>> (t[0],generateFileType(t[0]),t[1])
>>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>>
>>>>>> DFList = []
>>>>>>
>>>>>> for k in types:
>>>>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>>>>      DFList.append(df)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> My Pyspark program is currently identifies the list of the files
>>>>>>> from a directory (Using Python Popen command taking hadoop fs -ls
>>>>>>> arguments).  For each file, a Dataframe is created and processed. This is
>>>>>>> sequeatial. How to process all files paralelly?  Please note that every
>>>>>>> file in the directory has different schema.  So, depending on the file
>>>>>>> name, different logic is used for each file. So, I cannot really create one
>>>>>>> Dataframe for all these files and iterate each row.  Using wholeTextFiles
>>>>>>> seems to be good approach for me.  But, I am not sure how to create
>>>>>>> DataFrame from this.  For example, Is there a way to do this way do
>>>>>>> something like below.
>>>>>>>
>>>>>>> def createDFProcess(myrdd):
>>>>>>>     df = sqlCtx.read.json(myrdd)
>>>>>>>     df.show()
>>>>>>>
>>>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname',
>>>>>>> 'fcontent'])
>>>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>>>>
>>>>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>>>>> object is not callable'.  How to make it work?
>>>>>>>
>>>>>>> Or is there a better approach?
>>>>>>>
>>>>>>> -Arun
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Re: Best approach for processing all files parallelly

Posted by Arun Patel <ar...@gmail.com>.
Ayan, which version of Python are you using? I am using 2.6.9 and I don't
find generateFileType and getSchemaFor functions.  Thanks for your help.

On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> generateFileType (filename) returns FileType
>
> getSchemaFor(FileType) returns schema for FileType
>
> This for loop DOES NOT process files sequentially. It creates dataframes
> on all files which are of same types sequentially.
>
> On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <ar...@gmail.com>
> wrote:
>
>> Thanks Ayan.  Couple of questions:
>>
>> 1) How does generateFileType and getSchemaFor functions look like?
>> 2) 'For loop' is processing files sequentially, right? my requirement is
>> to process all files at same time.
>>
>> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <gu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> In this case, if you see, t[1] is NOT the file content, as I have added
>>> a "FileType" field. So, this collect is just bringing in the list of file
>>> types, should be fine
>>>
>>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <ar...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ayan.  I am really concerned about the collect.
>>>>
>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>
>>>> This will ship all files on to the driver, right?  It must be
>>>> inefficient.
>>>>
>>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I think you are correct direction. What is missing is: You do not need
>>>>> to create DF for each file. You can scramble files with similar structures
>>>>> together (by doing some filter on file name) and then create a DF for each
>>>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>>>>> would probably do it like this
>>>>>
>>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>>>> (t[0],generateFileType(t[0]),t[1])
>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>
>>>>> DFList = []
>>>>>
>>>>> for k in types:
>>>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>>>      DFList.append(df)
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My Pyspark program is currently identifies the list of the files from
>>>>>> a directory (Using Python Popen command taking hadoop fs -ls arguments).
>>>>>> For each file, a Dataframe is created and processed. This is sequeatial.
>>>>>> How to process all files paralelly?  Please note that every file in the
>>>>>> directory has different schema.  So, depending on the file name, different
>>>>>> logic is used for each file. So, I cannot really create one Dataframe for
>>>>>> all these files and iterate each row.  Using wholeTextFiles seems to be
>>>>>> good approach for me.  But, I am not sure how to create DataFrame from
>>>>>> this.  For example, Is there a way to do this way do something like below.
>>>>>>
>>>>>> def createDFProcess(myrdd):
>>>>>>     df = sqlCtx.read.json(myrdd)
>>>>>>     df.show()
>>>>>>
>>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname',
>>>>>> 'fcontent'])
>>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>>>
>>>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>>>> object is not callable'.  How to make it work?
>>>>>>
>>>>>> Or is there a better approach?
>>>>>>
>>>>>> -Arun
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: Best approach for processing all files parallelly

Posted by ayan guha <gu...@gmail.com>.
Hi

generateFileType (filename) returns FileType

getSchemaFor(FileType) returns schema for FileType

This for loop DOES NOT process files sequentially. It creates dataframes on
all files which are of same types sequentially.

On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <ar...@gmail.com> wrote:

> Thanks Ayan.  Couple of questions:
>
> 1) How does generateFileType and getSchemaFor functions look like?
> 2) 'For loop' is processing files sequentially, right? my requirement is
> to process all files at same time.
>
> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> In this case, if you see, t[1] is NOT the file content, as I have added a
>> "FileType" field. So, this collect is just bringing in the list of file
>> types, should be fine
>>
>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <ar...@gmail.com>
>> wrote:
>>
>>> Thanks Ayan.  I am really concerned about the collect.
>>>
>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>
>>> This will ship all files on to the driver, right?  It must be
>>> inefficient.
>>>
>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I think you are correct direction. What is missing is: You do not need
>>>> to create DF for each file. You can scramble files with similar structures
>>>> together (by doing some filter on file name) and then create a DF for each
>>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>>>> would probably do it like this
>>>>
>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>>> (t[0],generateFileType(t[0]),t[1])
>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>
>>>> DFList = []
>>>>
>>>> for k in types:
>>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>>      DFList.append(df)
>>>>
>>>>
>>>>
>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> My Pyspark program is currently identifies the list of the files from
>>>>> a directory (Using Python Popen command taking hadoop fs -ls arguments).
>>>>> For each file, a Dataframe is created and processed. This is sequeatial.
>>>>> How to process all files paralelly?  Please note that every file in the
>>>>> directory has different schema.  So, depending on the file name, different
>>>>> logic is used for each file. So, I cannot really create one Dataframe for
>>>>> all these files and iterate each row.  Using wholeTextFiles seems to be
>>>>> good approach for me.  But, I am not sure how to create DataFrame from
>>>>> this.  For example, Is there a way to do this way do something like below.
>>>>>
>>>>> def createDFProcess(myrdd):
>>>>>     df = sqlCtx.read.json(myrdd)
>>>>>     df.show()
>>>>>
>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>>
>>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>>> object is not callable'.  How to make it work?
>>>>>
>>>>> Or is there a better approach?
>>>>>
>>>>> -Arun
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Re: Best approach for processing all files parallelly

Posted by Arun Patel <ar...@gmail.com>.
Thanks Ayan.  Couple of questions:

1) How does generateFileType and getSchemaFor functions look like?
2) 'For loop' is processing files sequentially, right? my requirement is to
process all files at same time.

On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> In this case, if you see, t[1] is NOT the file content, as I have added a
> "FileType" field. So, this collect is just bringing in the list of file
> types, should be fine
>
> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <ar...@gmail.com>
> wrote:
>
>> Thanks Ayan.  I am really concerned about the collect.
>>
>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>
>> This will ship all files on to the driver, right?  It must be
>> inefficient.
>>
>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I think you are correct direction. What is missing is: You do not need
>>> to create DF for each file. You can scramble files with similar structures
>>> together (by doing some filter on file name) and then create a DF for each
>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>>> would probably do it like this
>>>
>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>> (t[0],generateFileType(t[0]),t[1])
>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>
>>> DFList = []
>>>
>>> for k in types:
>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>      DFList.append(df)
>>>
>>>
>>>
>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
>>> wrote:
>>>
>>>> My Pyspark program is currently identifies the list of the files from a
>>>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>>>> each file, a Dataframe is created and processed. This is sequeatial. How to
>>>> process all files paralelly?  Please note that every file in the directory
>>>> has different schema.  So, depending on the file name, different logic is
>>>> used for each file. So, I cannot really create one Dataframe for all these
>>>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>>>> for me.  But, I am not sure how to create DataFrame from this.  For
>>>> example, Is there a way to do this way do something like below.
>>>>
>>>> def createDFProcess(myrdd):
>>>>     df = sqlCtx.read.json(myrdd)
>>>>     df.show()
>>>>
>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>
>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>> object is not callable'.  How to make it work?
>>>>
>>>> Or is there a better approach?
>>>>
>>>> -Arun
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: Best approach for processing all files parallelly

Posted by ayan guha <gu...@gmail.com>.
Hi

In this case, if you see, t[1] is NOT the file content, as I have added a
"FileType" field. So, this collect is just bringing in the list of file
types, should be fine

On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <ar...@gmail.com> wrote:

> Thanks Ayan.  I am really concerned about the collect.
>
> types = rdd1.map(lambda t: t[1]).distinct().collect()
>
> This will ship all files on to the driver, right?  It must be inefficient.
>
>
> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> I think you are correct direction. What is missing is: You do not need to
>> create DF for each file. You can scramble files with similar structures
>> together (by doing some filter on file name) and then create a DF for each
>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>> would probably do it like this
>>
>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>> (t[0],generateFileType(t[0]),t[1])
>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>
>> DFList = []
>>
>> for k in types:
>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>      DFList.append(df)
>>
>>
>>
>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
>> wrote:
>>
>>> My Pyspark program is currently identifies the list of the files from a
>>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>>> each file, a Dataframe is created and processed. This is sequeatial. How to
>>> process all files paralelly?  Please note that every file in the directory
>>> has different schema.  So, depending on the file name, different logic is
>>> used for each file. So, I cannot really create one Dataframe for all these
>>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>>> for me.  But, I am not sure how to create DataFrame from this.  For
>>> example, Is there a way to do this way do something like below.
>>>
>>> def createDFProcess(myrdd):
>>>     df = sqlCtx.read.json(myrdd)
>>>     df.show()
>>>
>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>
>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>> object is not callable'.  How to make it work?
>>>
>>> Or is there a better approach?
>>>
>>> -Arun
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Re: Best approach for processing all files parallelly

Posted by Arun Patel <ar...@gmail.com>.
Thanks Ayan.  I am really concerned about the collect.

types = rdd1.map(lambda t: t[1]).distinct().collect()

This will ship all files on to the driver, right?  It must be inefficient.


On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> I think you are correct direction. What is missing is: You do not need to
> create DF for each file. You can scramble files with similar structures
> together (by doing some filter on file name) and then create a DF for each
> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
> would probably do it like this
>
> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
> (t[0],generateFileType(t[0]),t[1])
> types = rdd1.map(lambda t: t[1]).distinct().collect()
>
> DFList = []
>
> for k in types:
>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>      DFList.append(df)
>
>
>
> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com>
> wrote:
>
>> My Pyspark program is currently identifies the list of the files from a
>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>> each file, a Dataframe is created and processed. This is sequeatial. How to
>> process all files paralelly?  Please note that every file in the directory
>> has different schema.  So, depending on the file name, different logic is
>> used for each file. So, I cannot really create one Dataframe for all these
>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>> for me.  But, I am not sure how to create DataFrame from this.  For
>> example, Is there a way to do this way do something like below.
>>
>> def createDFProcess(myrdd):
>>     df = sqlCtx.read.json(myrdd)
>>     df.show()
>>
>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>
>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>> object is not callable'.  How to make it work?
>>
>> Or is there a better approach?
>>
>> -Arun
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: Best approach for processing all files parallelly

Posted by ayan guha <gu...@gmail.com>.
Hi

I think you are correct direction. What is missing is: You do not need to
create DF for each file. You can scramble files with similar structures
together (by doing some filter on file name) and then create a DF for each
type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
would probably do it like this

rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
(t[0],generateFileType(t[0]),t[1])
types = rdd1.map(lambda t: t[1]).distinct().collect()

DFList = []

for k in types:
     df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
     DFList.append(df)



On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <ar...@gmail.com> wrote:

> My Pyspark program is currently identifies the list of the files from a
> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
> each file, a Dataframe is created and processed. This is sequeatial. How to
> process all files paralelly?  Please note that every file in the directory
> has different schema.  So, depending on the file name, different logic is
> used for each file. So, I cannot really create one Dataframe for all these
> files and iterate each row.  Using wholeTextFiles seems to be good approach
> for me.  But, I am not sure how to create DataFrame from this.  For
> example, Is there a way to do this way do something like below.
>
> def createDFProcess(myrdd):
>     df = sqlCtx.read.json(myrdd)
>     df.show()
>
> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>
> Above code does not work.  I get an error 'TypeError: 'JavaPackage' object
> is not callable'.  How to make it work?
>
> Or is there a better approach?
>
> -Arun
>
>


-- 
Best Regards,
Ayan Guha