You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Eric Beabes <ma...@gmail.com> on 2021/05/25 17:23:50 UTC

Reading parquet files in parallel on the cluster

I've a use case in which I need to read Parquet files in parallel from over
1000+ directories. I am doing something like this:

   val df = list.toList.toDF()

    df.foreach(c => {
      val config = *getConfigs()*
      doSomething(spark, config)
    })


In the doSomething method, when I try to do this:

val df1 = spark.read.parquet(pathToRead).collect()


I get a NullPointer exception given below. It seems the 'spark.read'
only works on the Driver not on the cluster. How can I do what I want
to do? Please let me know. Thank you.


21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
java.lang.NullPointerException

        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)

        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)

        at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)

        at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)

RE: Reading parquet files in parallel on the cluster

Posted by Boris Litvak <bo...@skf.com>.
Eric, can you do the following:

  1.  List the files on driver and parallelize the file names as a DataFrame, based on directory name
  2.  Compact the files in each directory in a task on the executor.

Alternatively and easier, you can just go over the directories in the driver using a simple for and launch a job per directory.
What am I missing?

Boris

From: Eric Beabes <ma...@gmail.com>
Sent: Wednesday, 26 May 2021 0:34
To: Sean Owen <sr...@gmail.com>
Cc: Silvio Fiorito <si...@granturing.com>; spark-user <us...@spark.apache.org>
Subject: Re: Reading parquet files in parallel on the cluster

Right... but the problem is still the same, no? Those N Jobs (aka Futures or Threads) will all be running on the Driver. Each with its own SparkSession. Isn't that going to put a lot of burden on one Machine? Is that really distributing the load across the cluster? Am I missing something?

Would it be better to use ECS (Elastic Container Service) for this use case which allows us to autoscale?

On Tue, May 25, 2021 at 2:16 PM Sean Owen <sr...@gmail.com>> wrote:
What you could do is launch N Spark jobs in parallel from the driver. Each one would process a directory you supply with spark.read.parquet, for example. You would just have 10s or 100s of those jobs running at the same time.  You have to write a bit of async code to do it, but it's pretty easy with Scala Futures.

On Tue, May 25, 2021 at 3:31 PM Eric Beabes <ma...@gmail.com>> wrote:
Here's the use case:

We've a bunch of directories (over 1000) which contain tons of small files in each. Each directory is for a different customer so they are independent in that respect. We need to merge all the small files in each directory into one (or a few) compacted file(s) by using a 'coalesce' function.

Clearly we can do this on the Driver by doing something like:

list.par.foreach (dir =>compact(spark, dir))

This works but the problem here is that the parallelism happens on Driver which won't scale when we've 10,000 customers! At any given time there will be only as many compactions happening as the number of cores on the Driver, right?

We were hoping to do this:

val df = list.toDF()
df.foreach(dir => compact(spark,dir))

Our hope was, this will distribute the load amongst Spark Executors & will scale better.  But this throws the NullPointerException shown in the original email.

Is there a better way to do this?


On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <si...@granturing.com>> wrote:
Why not just read from Spark as normal? Do these files have different or incompatible schemas?

val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)

From: Eric Beabes <ma...@gmail.com>>
Date: Tuesday, May 25, 2021 at 1:24 PM
To: spark-user <us...@spark.apache.org>>
Subject: Reading parquet files in parallel on the cluster

I've a use case in which I need to read Parquet files in parallel from over 1000+ directories. I am doing something like this:


   val df = list.toList.toDF()

    df.foreach(c => {
      val config = getConfigs()
      doSomething(spark, config)
    })



In the doSomething method, when I try to do this:

val df1 = spark.read.parquet(pathToRead).collect()



I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.



21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): java.lang.NullPointerException



        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)



        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)



        at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)



        at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)



Re: Reading parquet files in parallel on the cluster

Posted by Eric Beabes <ma...@gmail.com>.
Thanks for your time & advice. We will experiment & see which works best
for us.... EMR or ECS.

On Tue, May 25, 2021 at 2:39 PM Sean Owen <sr...@gmail.com> wrote:

> No, the work is happening on the cluster; you just have (say) 100 parallel
> jobs running at the same time. You apply spark.read.parquet to each dir --
> from the driver yes, but spark.read is distributed. At extremes, yes that
> would challenge the driver, to manage 1000s of jobs concurrently. You may
> also find that if each job is tiny, there's some overhead in running each
> as a distributed operation that may be significant. But it seems like the
> simplest thing and will probably work fine.
>
> On Tue, May 25, 2021 at 4:34 PM Eric Beabes <ma...@gmail.com>
> wrote:
>
>> Right... but the problem is still the same, no? Those N Jobs (aka Futures
>> or Threads) will all be running on the Driver. Each with its own
>> SparkSession. Isn't that going to put a lot of burden on one Machine? Is
>> that really distributing the load across the cluster? Am I missing
>> something?
>>
>> Would it be better to use ECS (Elastic Container Service) for this use
>> case which allows us to autoscale?
>>
>> On Tue, May 25, 2021 at 2:16 PM Sean Owen <sr...@gmail.com> wrote:
>>
>>> What you could do is launch N Spark jobs in parallel from the driver.
>>> Each one would process a directory you supply with spark.read.parquet, for
>>> example. You would just have 10s or 100s of those jobs running at the same
>>> time.  You have to write a bit of async code to do it, but it's pretty easy
>>> with Scala Futures.
>>>
>>> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <ma...@gmail.com>
>>> wrote:
>>>
>>>> Here's the use case:
>>>>
>>>> We've a bunch of directories (over 1000) which contain tons of small
>>>> files in each. Each directory is for a different customer so they are
>>>> independent in that respect. We need to merge all the small files in each
>>>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>>>> function.
>>>>
>>>> Clearly we can do this on the Driver by doing something like:
>>>>
>>>> list.par.foreach (dir =>compact(spark, dir))
>>>>
>>>> This works but the problem here is that the parallelism happens on
>>>> Driver which won't scale when we've 10,000 customers! At any given time
>>>> there will be only as many compactions happening as the number of cores on
>>>> the Driver, right?
>>>>
>>>> We were hoping to do this:
>>>>
>>>> val df = list.toDF()
>>>> df.foreach(dir => compact(spark,dir))
>>>>
>>>> Our hope was, this will distribute the load amongst Spark Executors &
>>>> will scale better.  But this throws the NullPointerException shown in the
>>>> original email.
>>>>
>>>> Is there a better way to do this?
>>>>
>>>>
>>>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>>>> silvio.fiorito@granturing.com> wrote:
>>>>
>>>>> Why not just read from Spark as normal? Do these files have different
>>>>> or incompatible schemas?
>>>>>
>>>>>
>>>>>
>>>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>>>
>>>>>
>>>>>
>>>>> *From: *Eric Beabes <ma...@gmail.com>
>>>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>>>> *To: *spark-user <us...@spark.apache.org>
>>>>> *Subject: *Reading parquet files in parallel on the cluster
>>>>>
>>>>>
>>>>>
>>>>> I've a use case in which I need to read Parquet files in parallel from
>>>>> over 1000+ directories. I am doing something like this:
>>>>>
>>>>>
>>>>>
>>>>>    val df = list.toList.toDF()
>>>>>
>>>>>     df.foreach(c => {
>>>>>       val config = *getConfigs()*
>>>>> *      doSomething*(spark, config)
>>>>>     })
>>>>>
>>>>>
>>>>>
>>>>> In the doSomething method, when I try to do this:
>>>>>
>>>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>>>
>>>>>
>>>>>
>>>>> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>>>>>
>>>>>
>>>>>
>>>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>>>> java.lang.NullPointerException
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>>>
>>>>>
>>>>>
>>>>>

Re: Reading parquet files in parallel on the cluster

Posted by Sean Owen <sr...@gmail.com>.
No, the work is happening on the cluster; you just have (say) 100 parallel
jobs running at the same time. You apply spark.read.parquet to each dir --
from the driver yes, but spark.read is distributed. At extremes, yes that
would challenge the driver, to manage 1000s of jobs concurrently. You may
also find that if each job is tiny, there's some overhead in running each
as a distributed operation that may be significant. But it seems like the
simplest thing and will probably work fine.

On Tue, May 25, 2021 at 4:34 PM Eric Beabes <ma...@gmail.com>
wrote:

> Right... but the problem is still the same, no? Those N Jobs (aka Futures
> or Threads) will all be running on the Driver. Each with its own
> SparkSession. Isn't that going to put a lot of burden on one Machine? Is
> that really distributing the load across the cluster? Am I missing
> something?
>
> Would it be better to use ECS (Elastic Container Service) for this use
> case which allows us to autoscale?
>
> On Tue, May 25, 2021 at 2:16 PM Sean Owen <sr...@gmail.com> wrote:
>
>> What you could do is launch N Spark jobs in parallel from the driver.
>> Each one would process a directory you supply with spark.read.parquet, for
>> example. You would just have 10s or 100s of those jobs running at the same
>> time.  You have to write a bit of async code to do it, but it's pretty easy
>> with Scala Futures.
>>
>> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <ma...@gmail.com>
>> wrote:
>>
>>> Here's the use case:
>>>
>>> We've a bunch of directories (over 1000) which contain tons of small
>>> files in each. Each directory is for a different customer so they are
>>> independent in that respect. We need to merge all the small files in each
>>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>>> function.
>>>
>>> Clearly we can do this on the Driver by doing something like:
>>>
>>> list.par.foreach (dir =>compact(spark, dir))
>>>
>>> This works but the problem here is that the parallelism happens on
>>> Driver which won't scale when we've 10,000 customers! At any given time
>>> there will be only as many compactions happening as the number of cores on
>>> the Driver, right?
>>>
>>> We were hoping to do this:
>>>
>>> val df = list.toDF()
>>> df.foreach(dir => compact(spark,dir))
>>>
>>> Our hope was, this will distribute the load amongst Spark Executors &
>>> will scale better.  But this throws the NullPointerException shown in the
>>> original email.
>>>
>>> Is there a better way to do this?
>>>
>>>
>>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>>> silvio.fiorito@granturing.com> wrote:
>>>
>>>> Why not just read from Spark as normal? Do these files have different
>>>> or incompatible schemas?
>>>>
>>>>
>>>>
>>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>>
>>>>
>>>>
>>>> *From: *Eric Beabes <ma...@gmail.com>
>>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>>> *To: *spark-user <us...@spark.apache.org>
>>>> *Subject: *Reading parquet files in parallel on the cluster
>>>>
>>>>
>>>>
>>>> I've a use case in which I need to read Parquet files in parallel from
>>>> over 1000+ directories. I am doing something like this:
>>>>
>>>>
>>>>
>>>>    val df = list.toList.toDF()
>>>>
>>>>     df.foreach(c => {
>>>>       val config = *getConfigs()*
>>>> *      doSomething*(spark, config)
>>>>     })
>>>>
>>>>
>>>>
>>>> In the doSomething method, when I try to do this:
>>>>
>>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>>
>>>>
>>>>
>>>> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>>>>
>>>>
>>>>
>>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>>> java.lang.NullPointerException
>>>>
>>>>
>>>>
>>>>         at
>>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>>
>>>>
>>>>
>>>>         at
>>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>>
>>>>
>>>>
>>>>         at
>>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>>>
>>>>
>>>>
>>>>         at
>>>> org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>>
>>>>
>>>>
>>>>

Re: Reading parquet files in parallel on the cluster

Posted by Eric Beabes <ma...@gmail.com>.
Right... but the problem is still the same, no? Those N Jobs (aka Futures
or Threads) will all be running on the Driver. Each with its own
SparkSession. Isn't that going to put a lot of burden on one Machine? Is
that really distributing the load across the cluster? Am I missing
something?

Would it be better to use ECS (Elastic Container Service) for this use case
which allows us to autoscale?

On Tue, May 25, 2021 at 2:16 PM Sean Owen <sr...@gmail.com> wrote:

> What you could do is launch N Spark jobs in parallel from the driver. Each
> one would process a directory you supply with spark.read.parquet, for
> example. You would just have 10s or 100s of those jobs running at the same
> time.  You have to write a bit of async code to do it, but it's pretty easy
> with Scala Futures.
>
> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <ma...@gmail.com>
> wrote:
>
>> Here's the use case:
>>
>> We've a bunch of directories (over 1000) which contain tons of small
>> files in each. Each directory is for a different customer so they are
>> independent in that respect. We need to merge all the small files in each
>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>> function.
>>
>> Clearly we can do this on the Driver by doing something like:
>>
>> list.par.foreach (dir =>compact(spark, dir))
>>
>> This works but the problem here is that the parallelism happens on Driver
>> which won't scale when we've 10,000 customers! At any given time there will
>> be only as many compactions happening as the number of cores on the Driver,
>> right?
>>
>> We were hoping to do this:
>>
>> val df = list.toDF()
>> df.foreach(dir => compact(spark,dir))
>>
>> Our hope was, this will distribute the load amongst Spark Executors &
>> will scale better.  But this throws the NullPointerException shown in the
>> original email.
>>
>> Is there a better way to do this?
>>
>>
>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>> silvio.fiorito@granturing.com> wrote:
>>
>>> Why not just read from Spark as normal? Do these files have different or
>>> incompatible schemas?
>>>
>>>
>>>
>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>
>>>
>>>
>>> *From: *Eric Beabes <ma...@gmail.com>
>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>> *To: *spark-user <us...@spark.apache.org>
>>> *Subject: *Reading parquet files in parallel on the cluster
>>>
>>>
>>>
>>> I've a use case in which I need to read Parquet files in parallel from
>>> over 1000+ directories. I am doing something like this:
>>>
>>>
>>>
>>>    val df = list.toList.toDF()
>>>
>>>     df.foreach(c => {
>>>       val config = *getConfigs()*
>>> *      doSomething*(spark, config)
>>>     })
>>>
>>>
>>>
>>> In the doSomething method, when I try to do this:
>>>
>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>
>>>
>>>
>>> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>>>
>>>
>>>
>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>> java.lang.NullPointerException
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>>
>>>
>>>
>>>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>
>>>
>>>
>>>

Re: Reading parquet files in parallel on the cluster

Posted by Sean Owen <sr...@gmail.com>.
What you could do is launch N Spark jobs in parallel from the driver. Each
one would process a directory you supply with spark.read.parquet, for
example. You would just have 10s or 100s of those jobs running at the same
time.  You have to write a bit of async code to do it, but it's pretty easy
with Scala Futures.

On Tue, May 25, 2021 at 3:31 PM Eric Beabes <ma...@gmail.com>
wrote:

> Here's the use case:
>
> We've a bunch of directories (over 1000) which contain tons of small files
> in each. Each directory is for a different customer so they are independent
> in that respect. We need to merge all the small files in each directory
> into one (or a few) compacted file(s) by using a 'coalesce' function.
>
> Clearly we can do this on the Driver by doing something like:
>
> list.par.foreach (dir =>compact(spark, dir))
>
> This works but the problem here is that the parallelism happens on Driver
> which won't scale when we've 10,000 customers! At any given time there will
> be only as many compactions happening as the number of cores on the Driver,
> right?
>
> We were hoping to do this:
>
> val df = list.toDF()
> df.foreach(dir => compact(spark,dir))
>
> Our hope was, this will distribute the load amongst Spark Executors & will
> scale better.  But this throws the NullPointerException shown in the
> original email.
>
> Is there a better way to do this?
>
>
> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
> silvio.fiorito@granturing.com> wrote:
>
>> Why not just read from Spark as normal? Do these files have different or
>> incompatible schemas?
>>
>>
>>
>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>
>>
>>
>> *From: *Eric Beabes <ma...@gmail.com>
>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>> *To: *spark-user <us...@spark.apache.org>
>> *Subject: *Reading parquet files in parallel on the cluster
>>
>>
>>
>> I've a use case in which I need to read Parquet files in parallel from
>> over 1000+ directories. I am doing something like this:
>>
>>
>>
>>    val df = list.toList.toDF()
>>
>>     df.foreach(c => {
>>       val config = *getConfigs()*
>> *      doSomething*(spark, config)
>>     })
>>
>>
>>
>> In the doSomething method, when I try to do this:
>>
>> val df1 = spark.read.parquet(pathToRead).collect()
>>
>>
>>
>> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>>
>>
>>
>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9,
>> ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>> java.lang.NullPointerException
>>
>>
>>
>>         at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>
>>
>>
>>         at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>
>>
>>
>>         at
>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>
>>
>>
>>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>
>>
>>
>>

Re: Reading parquet files in parallel on the cluster

Posted by Eric Beabes <ma...@gmail.com>.
Here's the use case:

We've a bunch of directories (over 1000) which contain tons of small files
in each. Each directory is for a different customer so they are independent
in that respect. We need to merge all the small files in each directory
into one (or a few) compacted file(s) by using a 'coalesce' function.

Clearly we can do this on the Driver by doing something like:

list.par.foreach (dir =>compact(spark, dir))

This works but the problem here is that the parallelism happens on Driver
which won't scale when we've 10,000 customers! At any given time there will
be only as many compactions happening as the number of cores on the Driver,
right?

We were hoping to do this:

val df = list.toDF()
df.foreach(dir => compact(spark,dir))

Our hope was, this will distribute the load amongst Spark Executors & will
scale better.  But this throws the NullPointerException shown in the
original email.

Is there a better way to do this?


On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Why not just read from Spark as normal? Do these files have different or
> incompatible schemas?
>
>
>
> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>
>
>
> *From: *Eric Beabes <ma...@gmail.com>
> *Date: *Tuesday, May 25, 2021 at 1:24 PM
> *To: *spark-user <us...@spark.apache.org>
> *Subject: *Reading parquet files in parallel on the cluster
>
>
>
> I've a use case in which I need to read Parquet files in parallel from
> over 1000+ directories. I am doing something like this:
>
>
>
>    val df = list.toList.toDF()
>
>     df.foreach(c => {
>       val config = *getConfigs()*
> *      doSomething*(spark, config)
>     })
>
>
>
> In the doSomething method, when I try to do this:
>
> val df1 = spark.read.parquet(pathToRead).collect()
>
>
>
> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>
>
>
> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9,
> ip-10-0-5-3.us-west-2.compute.internal, executor 11):
> java.lang.NullPointerException
>
>
>
>         at
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>
>
>
>         at
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>
>
>
>         at
> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>
>
>
>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>
>
>
>

Re: Reading parquet files in parallel on the cluster

Posted by Silvio Fiorito <si...@granturing.com>.
Why not just read from Spark as normal? Do these files have different or incompatible schemas?

val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)

From: Eric Beabes <ma...@gmail.com>
Date: Tuesday, May 25, 2021 at 1:24 PM
To: spark-user <us...@spark.apache.org>
Subject: Reading parquet files in parallel on the cluster

I've a use case in which I need to read Parquet files in parallel from over 1000+ directories. I am doing something like this:


   val df = list.toList.toDF()

    df.foreach(c => {
      val config = getConfigs()
      doSomething(spark, config)
    })



In the doSomething method, when I try to do this:

val df1 = spark.read.parquet(pathToRead).collect()



I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.



21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): java.lang.NullPointerException



        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)



        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)



        at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)



        at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)



Re: Reading parquet files in parallel on the cluster

Posted by Sean Owen <sr...@gmail.com>.
Right, you can't use Spark within Spark.
Do you actually need to read Parquet like this vs spark.read.parquet?
that's also parallel of course.
You'd otherwise be reading the files directly in your function with the
Parquet APIs.

On Tue, May 25, 2021 at 12:24 PM Eric Beabes <ma...@gmail.com>
wrote:

> I've a use case in which I need to read Parquet files in parallel from
> over 1000+ directories. I am doing something like this:
>
>    val df = list.toList.toDF()
>
>     df.foreach(c => {
>       val config = *getConfigs()*
>       doSomething(spark, config)
>     })
>
>
> In the doSomething method, when I try to do this:
>
> val df1 = spark.read.parquet(pathToRead).collect()
>
>
> I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
>
>
> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): java.lang.NullPointerException
>
>         at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>
>         at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>
>         at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>
>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>
>