You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Patrick McCarthy <pm...@dstillery.com> on 2017/04/14 16:10:15 UTC

Memory problems with simple ETL in Pyspark

Hello,

I'm trying to build an ETL job which takes in 30-100gb of text data and
prepares it for SparkML. I don't speak Scala so I've been trying to
implement in PySpark on YARN, Spark 2.1.

Despite the transformations being fairly simple, the job always fails by
running out of executor memory.

The input table is long (~6bn rows) but composed of three simple values:

#####################################################################
all_data_long.printSchema()

root
|-- id: long (nullable = true)
|-- label: short (nullable = true)
|-- segment: string (nullable = true)

#####################################################################

First I join it to a table of particular segments of interests and do an
aggregation,

#####################################################################

audiences.printSchema()

root
 |-- entry: integer (nullable = true)
 |-- descr: string (nullable = true)


print("Num in adl: {}".format(str(all_data_long.count())))

aud_str = audiences.select(audiences['entry'].cast('string'),
        audiences['descr'])

alldata_aud = all_data_long.join(aud_str,
        all_data_long['segment']==aud_str['entry'],
        'left_outer')

str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')

idx_df   = str_idx.fit(alldata_aud)
label_df =
idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')

id_seg = (label_df
        .filter(label_df.descr.isNotNull())
        .groupBy('id')
        .agg(collect_list('descr')))

id_seg.write.saveAsTable("hive.id_seg")

#####################################################################

Then, I use that StringIndexer again on the first data frame to featurize
the segment ID

#####################################################################

alldat_idx =
idx_df.transform(all_data_long).withColumnRenamed('label','label_val')

#####################################################################


My ultimate goal is to make a SparseVector, so I group the indexed segments
by id and try to cast it into a vector

#####################################################################

list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
for v in l}),VectorUDT())

alldat_idx.cache()

feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)

print("alldat_dix: {}".format(str(alldat_idx.count())))

feature_df = (alldat_idx
        .withColumn('label',alldat_idx['label_val'].cast('double'))
        .groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
        .withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
        .drop('collect_list_is')
        .drop('num_feat'))

feature_df.cache()
print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
failure occurs here

#####################################################################

Here, however, I always run out of memory on the executors (I've twiddled
driver and executor memory to check) and YARN kills off my containers. I've
gone as high as —executor-memory 15g but it still doesn't help.

Given the number of segments is at most 50,000 I'm surprised that a
smallish row-wise operation is enough to blow up the process.


Is it really the UDF that's killing me? Do I have to rewrite it in Scala?





Query plans for the failing stage:

#####################################################################


== Parsed Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
      +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
<lambda>(collect_list_is#197, num_feat#202) AS features#208]
         +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
            +- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
               +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
                     +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
                        +- MetastoreRelation pmccarthy, all_data_long

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
      +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
<lambda>(collect_list_is#197, num_feat#202) AS features#208]
         +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
            +- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
               +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
                     +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
                        +- MetastoreRelation pmccarthy, all_data_long

== Optimized Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project
   +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
            +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
[id#0L, label#183, collect_list_is#197, pythonUDF0#244]
               +- SortAggregate(key=[id#0L, label#183],
functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
collect_list_is#197])
                  +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
FIRST], false, 0
                     +- Exchange hashpartitioning(id#0L, label#183, 200)
                        +- *Project [id#0L, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                           +- InMemoryTableScan [id#0L, indexedSegs#93,
label_val#99]
                                 +- InMemoryRelation [id#0L, label_val#99,
segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, memory,
deserialized, 1 replicas)
                                       +- *Project [id#0L, label#1 AS
label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
                                          +- HiveTableScan [id#0L, label#1,
segment#2], MetastoreRelation pmccarthy, all_data_long

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#284L])
      +- InMemoryTableScan
            +- InMemoryRelation [id#0L, label#183, features#208], true,
10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *Project [id#0L, label#183, pythonUDF0#244 AS
features#208]
                     +- BatchEvalPython [<lambda>(collect_list_is#197,
56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
                        +- SortAggregate(key=[id#0L, label#183],
functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
collect_list_is#197])
                           +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(id#0L,
label#183, 200)
                                 +- *Project [id#0L, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                                    +- InMemoryTableScan [id#0L,
indexedSegs#93, label_val#99]
                                          +- InMemoryRelation [id#0L,
label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas)
                                                +- *Project [id#0L, label#1
AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
                                                   +- HiveTableScan [id#0L,
label#1, segment#2], MetastoreRelation pmccarthy, all_data_long

Re: Memory problems with simple ETL in Pyspark

Posted by ayan guha <gu...@gmail.com>.
Good to know it worked. In case some of the job still failed can indicate
skew in your dataset. You may want to think of a partition by function.

Also, do you still see containers killed by yarn? If so, at what point? You
should see something like your app is trying to use x gb while yarn can
provide only y gb. You have option to go higher on executor memory little
more, maybe till 18G with 2G overhead. Finally you may want to tweak memory
fraction settings a little to see if you can salvage failed jobs.

Best
Ayan


On Mon, 17 Apr 2017 at 5:45 am, Patrick McCarthy <pm...@dstillery.com>
wrote:

> The partitions helped!
>
> I added repartition() and my function looks like this now:
>
> feature_df = (alldat_idx
>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>         .groupBy('id','label')
>
> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>         .repartition(1000)
>         .withColumn('num_feat',lit(feature_vec_len))
>
> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>         .drop('collect_list_is')
>         .drop('num_feat'))
>
> I got a few failed containers for memory overflow, but the job was able to
> finish successfully. I tried upping the repartition as high as 4000 but a
> few still failed.
>
> For posterity's sake, where would I look for the footprint you have in
> mind? On the executor tab?
>
> Since the audience part of the task finished successfully and the failure
> was on a df that didn't touch it, it shouldn't've made a difference.
>
> Thank you!
>
> On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <gu...@gmail.com> wrote:
>
>> What i missed is try increasing number of partitions using repartition
>>
>> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <gu...@gmail.com> wrote:
>>
>>> It does not look like scala vs python thing. How big is your audience
>>> data store? Can it be broadcasted?
>>>
>>> What is the memory footprint you are seeing? At what point yarn is
>>> killing? Depeneding on that you may want to tweak around number of
>>> partitions of input dataset and increase number of executors
>>>
>>> Ayan
>>>
>>>
>>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <
>>> pmccarthy@dstillery.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>>> implement in PySpark on YARN, Spark 2.1.
>>>>
>>>> Despite the transformations being fairly simple, the job always fails
>>>> by running out of executor memory.
>>>>
>>>> The input table is long (~6bn rows) but composed of three simple values:
>>>>
>>>> #####################################################################
>>>> all_data_long.printSchema()
>>>>
>>>> root
>>>> |-- id: long (nullable = true)
>>>> |-- label: short (nullable = true)
>>>> |-- segment: string (nullable = true)
>>>>
>>>> #####################################################################
>>>>
>>>> First I join it to a table of particular segments of interests and do
>>>> an aggregation,
>>>>
>>>> #####################################################################
>>>>
>>>> audiences.printSchema()
>>>>
>>>> root
>>>>  |-- entry: integer (nullable = true)
>>>>  |-- descr: string (nullable = true)
>>>>
>>>>
>>>> print("Num in adl: {}".format(str(all_data_long.count())))
>>>>
>>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>>>         audiences['descr'])
>>>>
>>>> alldata_aud = all_data_long.join(aud_str,
>>>>         all_data_long['segment']==aud_str['entry'],
>>>>         'left_outer')
>>>>
>>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>>
>>>> idx_df   = str_idx.fit(alldata_aud)
>>>> label_df =
>>>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>>>>
>>>> id_seg = (label_df
>>>>         .filter(label_df.descr.isNotNull())
>>>>         .groupBy('id')
>>>>         .agg(collect_list('descr')))
>>>>
>>>> id_seg.write.saveAsTable("hive.id_seg")
>>>>
>>>> #####################################################################
>>>>
>>>> Then, I use that StringIndexer again on the first data frame to
>>>> featurize the segment ID
>>>>
>>>> #####################################################################
>>>>
>>>> alldat_idx =
>>>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>>>>
>>>> #####################################################################
>>>>
>>>>
>>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>>> segments by id and try to cast it into a vector
>>>>
>>>> #####################################################################
>>>>
>>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen,
>>>> {v:1.0 for v in l}),VectorUDT())
>>>>
>>>> alldat_idx.cache()
>>>>
>>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>>>
>>>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>>>
>>>> feature_df = (alldat_idx
>>>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>>>         .groupBy('id','label')
>>>>
>>>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>>>>         .withColumn('num_feat',lit(feature_vec_len))
>>>>
>>>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>>>>         .drop('collect_list_is')
>>>>         .drop('num_feat'))
>>>>
>>>> feature_df.cache()
>>>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>>>> failure occurs here
>>>>
>>>> #####################################################################
>>>>
>>>> Here, however, I always run out of memory on the executors (I've
>>>> twiddled driver and executor memory to check) and YARN kills off my
>>>> containers. I've gone as high as —executor-memory 15g but it still doesn't
>>>> help.
>>>>
>>>> Given the number of segments is at most 50,000 I'm surprised that a
>>>> smallish row-wise operation is enough to blow up the process.
>>>>
>>>>
>>>> Is it really the UDF that's killing me? Do I have to rewrite it in
>>>> Scala?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Query plans for the failing stage:
>>>>
>>>> #####################################################################
>>>>
>>>>
>>>> == Parsed Logical Plan ==
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project [id#0L, label#183, features#208]
>>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>>> num_feat#202]
>>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>>>                +- Project [id#0L, label_val#99, segment#2,
>>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>>                   +- Project [id#0L, label#1 AS label_val#99,
>>>> segment#2, indexedSegs#93]
>>>>                      +- Project [id#0L, label#1, segment#2,
>>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Analyzed Logical Plan ==
>>>> count: bigint
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project [id#0L, label#183, features#208]
>>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>>> num_feat#202]
>>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>>>                +- Project [id#0L, label_val#99, segment#2,
>>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>>                   +- Project [id#0L, label#1 AS label_val#99,
>>>> segment#2, indexedSegs#93]
>>>>                      +- Project [id#0L, label#1, segment#2,
>>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Optimized Logical Plan ==
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project
>>>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>>>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>>>             +- BatchEvalPython [<lambda>(collect_list_is#197,
>>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>>                +- SortAggregate(key=[id#0L, label#183],
>>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>>>> collect_list_is#197])
>>>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>>>> FIRST], false, 0
>>>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>>>                         +- *Project [id#0L, indexedSegs#93,
>>>> cast(label_val#99 as double) AS label#183]
>>>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>>>> label_val#99]
>>>>                                  +- InMemoryRelation [id#0L,
>>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>>> memory, deserialized, 1 replicas)
>>>>                                        +- *Project [id#0L, label#1 AS
>>>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>>                                           +- HiveTableScan [id#0L,
>>>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Physical Plan ==
>>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>>>> +- Exchange SinglePartition
>>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>>>> output=[count#284L])
>>>>       +- InMemoryTableScan
>>>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>>>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>>>> features#208]
>>>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>>                         +- SortAggregate(key=[id#0L, label#183],
>>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>>>> collect_list_is#197])
>>>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183
>>>> ASC NULLS FIRST], false, 0
>>>>                               +- Exchange hashpartitioning(id#0L,
>>>> label#183, 200)
>>>>                                  +- *Project [id#0L, indexedSegs#93,
>>>> cast(label_val#99 as double) AS label#183]
>>>>                                     +- InMemoryTableScan [id#0L,
>>>> indexedSegs#93, label_val#99]
>>>>                                           +- InMemoryRelation [id#0L,
>>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>>> memory, deserialized, 1 replicas)
>>>>                                                 +- *Project [id#0L,
>>>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>>                                                    +- HiveTableScan
>>>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>>
>>>>
>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha

Re: Memory problems with simple ETL in Pyspark

Posted by Patrick McCarthy <pm...@dstillery.com>.
The partitions helped!

I added repartition() and my function looks like this now:

feature_df = (alldat_idx
        .withColumn('label',alldat_idx['label_val'].cast('double'))
        .groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
        .repartition(1000)
        .withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
        .drop('collect_list_is')
        .drop('num_feat'))

I got a few failed containers for memory overflow, but the job was able to
finish successfully. I tried upping the repartition as high as 4000 but a
few still failed.

For posterity's sake, where would I look for the footprint you have in
mind? On the executor tab?

Since the audience part of the task finished successfully and the failure
was on a df that didn't touch it, it shouldn't've made a difference.

Thank you!

On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <gu...@gmail.com> wrote:

> What i missed is try increasing number of partitions using repartition
>
> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <gu...@gmail.com> wrote:
>
>> It does not look like scala vs python thing. How big is your audience
>> data store? Can it be broadcasted?
>>
>> What is the memory footprint you are seeing? At what point yarn is
>> killing? Depeneding on that you may want to tweak around number of
>> partitions of input dataset and increase number of executors
>>
>> Ayan
>>
>>
>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pm...@dstillery.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>> implement in PySpark on YARN, Spark 2.1.
>>>
>>> Despite the transformations being fairly simple, the job always fails by
>>> running out of executor memory.
>>>
>>> The input table is long (~6bn rows) but composed of three simple values:
>>>
>>> #####################################################################
>>> all_data_long.printSchema()
>>>
>>> root
>>> |-- id: long (nullable = true)
>>> |-- label: short (nullable = true)
>>> |-- segment: string (nullable = true)
>>>
>>> #####################################################################
>>>
>>> First I join it to a table of particular segments of interests and do an
>>> aggregation,
>>>
>>> #####################################################################
>>>
>>> audiences.printSchema()
>>>
>>> root
>>>  |-- entry: integer (nullable = true)
>>>  |-- descr: string (nullable = true)
>>>
>>>
>>> print("Num in adl: {}".format(str(all_data_long.count())))
>>>
>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>>         audiences['descr'])
>>>
>>> alldata_aud = all_data_long.join(aud_str,
>>>         all_data_long['segment']==aud_str['entry'],
>>>         'left_outer')
>>>
>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>
>>> idx_df   = str_idx.fit(alldata_aud)
>>> label_df = idx_df.transform(alldata_aud).withColumnRenamed('label','
>>> label_val')
>>>
>>> id_seg = (label_df
>>>         .filter(label_df.descr.isNotNull())
>>>         .groupBy('id')
>>>         .agg(collect_list('descr')))
>>>
>>> id_seg.write.saveAsTable("hive.id_seg")
>>>
>>> #####################################################################
>>>
>>> Then, I use that StringIndexer again on the first data frame to
>>> featurize the segment ID
>>>
>>> #####################################################################
>>>
>>> alldat_idx = idx_df.transform(all_data_long).withColumnRenamed('
>>> label','label_val')
>>>
>>> #####################################################################
>>>
>>>
>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>> segments by id and try to cast it into a vector
>>>
>>> #####################################################################
>>>
>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>>> for v in l}),VectorUDT())
>>>
>>> alldat_idx.cache()
>>>
>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>>
>>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>>
>>> feature_df = (alldat_idx
>>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>>         .groupBy('id','label')
>>>         .agg(sort_array(collect_list('indexedSegs')).alias('collect_
>>> list_is'))
>>>         .withColumn('num_feat',lit(feature_vec_len))
>>>         .withColumn('features',list_to_sparse_udf('collect_list_
>>> is','num_feat'))
>>>         .drop('collect_list_is')
>>>         .drop('num_feat'))
>>>
>>> feature_df.cache()
>>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>>> failure occurs here
>>>
>>> #####################################################################
>>>
>>> Here, however, I always run out of memory on the executors (I've
>>> twiddled driver and executor memory to check) and YARN kills off my
>>> containers. I've gone as high as —executor-memory 15g but it still doesn't
>>> help.
>>>
>>> Given the number of segments is at most 50,000 I'm surprised that a
>>> smallish row-wise operation is enough to blow up the process.
>>>
>>>
>>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>>>
>>>
>>>
>>>
>>>
>>> Query plans for the failing stage:
>>>
>>> #####################################################################
>>>
>>>
>>> == Parsed Logical Plan ==
>>> Aggregate [count(1) AS count#265L]
>>> +- Project [id#0L, label#183, features#208]
>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>> num_feat#202]
>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS
>>> collect_list_is#197]
>>>                +- Project [id#0L, label_val#99, segment#2,
>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>>> indexedSegs#93]
>>>                      +- Project [id#0L, label#1, segment#2,
>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Analyzed Logical Plan ==
>>> count: bigint
>>> Aggregate [count(1) AS count#265L]
>>> +- Project [id#0L, label#183, features#208]
>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>> num_feat#202]
>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS
>>> collect_list_is#197]
>>>                +- Project [id#0L, label_val#99, segment#2,
>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>>> indexedSegs#93]
>>>                      +- Project [id#0L, label#1, segment#2,
>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Optimized Logical Plan ==
>>> Aggregate [count(1) AS count#265L]
>>> +- Project
>>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
>>> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>                +- SortAggregate(key=[id#0L, label#183],
>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L,
>>> label#183, collect_list_is#197])
>>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>>> FIRST], false, 0
>>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>>                         +- *Project [id#0L, indexedSegs#93,
>>> cast(label_val#99 as double) AS label#183]
>>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>>> label_val#99]
>>>                                  +- InMemoryRelation [id#0L,
>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>> memory, deserialized, 1 replicas)
>>>                                        +- *Project [id#0L, label#1 AS
>>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>                                           +- HiveTableScan [id#0L,
>>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>>> +- Exchange SinglePartition
>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>>> output=[count#284L])
>>>       +- InMemoryTableScan
>>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>>> features#208]
>>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>                         +- SortAggregate(key=[id#0L, label#183],
>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L,
>>> label#183, collect_list_is#197])
>>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183
>>> ASC NULLS FIRST], false, 0
>>>                               +- Exchange hashpartitioning(id#0L,
>>> label#183, 200)
>>>                                  +- *Project [id#0L, indexedSegs#93,
>>> cast(label_val#99 as double) AS label#183]
>>>                                     +- InMemoryTableScan [id#0L,
>>> indexedSegs#93, label_val#99]
>>>                                           +- InMemoryRelation [id#0L,
>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>> memory, deserialized, 1 replicas)
>>>                                                 +- *Project [id#0L,
>>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>                                                    +- HiveTableScan
>>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
> Best Regards,
> Ayan Guha
>

Re: Memory problems with simple ETL in Pyspark

Posted by ayan guha <gu...@gmail.com>.
What i missed is try increasing number of partitions using repartition
On Sun, 16 Apr 2017 at 11:06 am, ayan guha <gu...@gmail.com> wrote:

> It does not look like scala vs python thing. How big is your audience data
> store? Can it be broadcasted?
>
> What is the memory footprint you are seeing? At what point yarn is
> killing? Depeneding on that you may want to tweak around number of
> partitions of input dataset and increase number of executors
>
> Ayan
>
>
> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pm...@dstillery.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>> prepares it for SparkML. I don't speak Scala so I've been trying to
>> implement in PySpark on YARN, Spark 2.1.
>>
>> Despite the transformations being fairly simple, the job always fails by
>> running out of executor memory.
>>
>> The input table is long (~6bn rows) but composed of three simple values:
>>
>> #####################################################################
>> all_data_long.printSchema()
>>
>> root
>> |-- id: long (nullable = true)
>> |-- label: short (nullable = true)
>> |-- segment: string (nullable = true)
>>
>> #####################################################################
>>
>> First I join it to a table of particular segments of interests and do an
>> aggregation,
>>
>> #####################################################################
>>
>> audiences.printSchema()
>>
>> root
>>  |-- entry: integer (nullable = true)
>>  |-- descr: string (nullable = true)
>>
>>
>> print("Num in adl: {}".format(str(all_data_long.count())))
>>
>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>         audiences['descr'])
>>
>> alldata_aud = all_data_long.join(aud_str,
>>         all_data_long['segment']==aud_str['entry'],
>>         'left_outer')
>>
>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>
>> idx_df   = str_idx.fit(alldata_aud)
>> label_df =
>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>>
>> id_seg = (label_df
>>         .filter(label_df.descr.isNotNull())
>>         .groupBy('id')
>>         .agg(collect_list('descr')))
>>
>> id_seg.write.saveAsTable("hive.id_seg")
>>
>> #####################################################################
>>
>> Then, I use that StringIndexer again on the first data frame to featurize
>> the segment ID
>>
>> #####################################################################
>>
>> alldat_idx =
>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>>
>> #####################################################################
>>
>>
>> My ultimate goal is to make a SparseVector, so I group the indexed
>> segments by id and try to cast it into a vector
>>
>> #####################################################################
>>
>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>> for v in l}),VectorUDT())
>>
>> alldat_idx.cache()
>>
>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>
>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>
>> feature_df = (alldat_idx
>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>         .groupBy('id','label')
>>
>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>>         .withColumn('num_feat',lit(feature_vec_len))
>>
>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>>         .drop('collect_list_is')
>>         .drop('num_feat'))
>>
>> feature_df.cache()
>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>> failure occurs here
>>
>> #####################################################################
>>
>> Here, however, I always run out of memory on the executors (I've twiddled
>> driver and executor memory to check) and YARN kills off my containers. I've
>> gone as high as —executor-memory 15g but it still doesn't help.
>>
>> Given the number of segments is at most 50,000 I'm surprised that a
>> smallish row-wise operation is enough to blow up the process.
>>
>>
>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>>
>>
>>
>>
>>
>> Query plans for the failing stage:
>>
>> #####################################################################
>>
>>
>> == Parsed Logical Plan ==
>> Aggregate [count(1) AS count#265L]
>> +- Project [id#0L, label#183, features#208]
>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>> num_feat#202]
>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>                +- Project [id#0L, label_val#99, segment#2,
>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>> indexedSegs#93]
>>                      +- Project [id#0L, label#1, segment#2,
>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>                         +- MetastoreRelation pmccarthy, all_data_long
>>
>> == Analyzed Logical Plan ==
>> count: bigint
>> Aggregate [count(1) AS count#265L]
>> +- Project [id#0L, label#183, features#208]
>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>> num_feat#202]
>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>                +- Project [id#0L, label_val#99, segment#2,
>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>> indexedSegs#93]
>>                      +- Project [id#0L, label#1, segment#2,
>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>                         +- MetastoreRelation pmccarthy, all_data_long
>>
>> == Optimized Logical Plan ==
>> Aggregate [count(1) AS count#265L]
>> +- Project
>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
>> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>                +- SortAggregate(key=[id#0L, label#183],
>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>> collect_list_is#197])
>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>> FIRST], false, 0
>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>                         +- *Project [id#0L, indexedSegs#93,
>> cast(label_val#99 as double) AS label#183]
>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>> label_val#99]
>>                                  +- InMemoryRelation [id#0L,
>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>                                        +- *Project [id#0L, label#1 AS
>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>                                           +- HiveTableScan [id#0L,
>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#284L])
>>       +- InMemoryTableScan
>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>> features#208]
>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>                         +- SortAggregate(key=[id#0L, label#183],
>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>> collect_list_is#197])
>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
>> NULLS FIRST], false, 0
>>                               +- Exchange hashpartitioning(id#0L,
>> label#183, 200)
>>                                  +- *Project [id#0L, indexedSegs#93,
>> cast(label_val#99 as double) AS label#183]
>>                                     +- InMemoryTableScan [id#0L,
>> indexedSegs#93, label_val#99]
>>                                           +- InMemoryRelation [id#0L,
>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>                                                 +- *Project [id#0L,
>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>                                                    +- HiveTableScan
>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>
>>
>> --
> Best Regards,
> Ayan Guha
>
-- 
Best Regards,
Ayan Guha

Re: Memory problems with simple ETL in Pyspark

Posted by ayan guha <gu...@gmail.com>.
It does not look like scala vs python thing. How big is your audience data
store? Can it be broadcasted?

What is the memory footprint you are seeing? At what point yarn is killing?
Depeneding on that you may want to tweak around number of partitions of
input dataset and increase number of executors

Ayan


On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pm...@dstillery.com>
wrote:

> Hello,
>
> I'm trying to build an ETL job which takes in 30-100gb of text data and
> prepares it for SparkML. I don't speak Scala so I've been trying to
> implement in PySpark on YARN, Spark 2.1.
>
> Despite the transformations being fairly simple, the job always fails by
> running out of executor memory.
>
> The input table is long (~6bn rows) but composed of three simple values:
>
> #####################################################################
> all_data_long.printSchema()
>
> root
> |-- id: long (nullable = true)
> |-- label: short (nullable = true)
> |-- segment: string (nullable = true)
>
> #####################################################################
>
> First I join it to a table of particular segments of interests and do an
> aggregation,
>
> #####################################################################
>
> audiences.printSchema()
>
> root
>  |-- entry: integer (nullable = true)
>  |-- descr: string (nullable = true)
>
>
> print("Num in adl: {}".format(str(all_data_long.count())))
>
> aud_str = audiences.select(audiences['entry'].cast('string'),
>         audiences['descr'])
>
> alldata_aud = all_data_long.join(aud_str,
>         all_data_long['segment']==aud_str['entry'],
>         'left_outer')
>
> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>
> idx_df   = str_idx.fit(alldata_aud)
> label_df =
> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>
> id_seg = (label_df
>         .filter(label_df.descr.isNotNull())
>         .groupBy('id')
>         .agg(collect_list('descr')))
>
> id_seg.write.saveAsTable("hive.id_seg")
>
> #####################################################################
>
> Then, I use that StringIndexer again on the first data frame to featurize
> the segment ID
>
> #####################################################################
>
> alldat_idx =
> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>
> #####################################################################
>
>
> My ultimate goal is to make a SparseVector, so I group the indexed
> segments by id and try to cast it into a vector
>
> #####################################################################
>
> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
> for v in l}),VectorUDT())
>
> alldat_idx.cache()
>
> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>
> print("alldat_dix: {}".format(str(alldat_idx.count())))
>
> feature_df = (alldat_idx
>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>         .groupBy('id','label')
>
> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>         .withColumn('num_feat',lit(feature_vec_len))
>
> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>         .drop('collect_list_is')
>         .drop('num_feat'))
>
> feature_df.cache()
> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
> failure occurs here
>
> #####################################################################
>
> Here, however, I always run out of memory on the executors (I've twiddled
> driver and executor memory to check) and YARN kills off my containers. I've
> gone as high as —executor-memory 15g but it still doesn't help.
>
> Given the number of segments is at most 50,000 I'm surprised that a
> smallish row-wise operation is enough to blow up the process.
>
>
> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>
>
>
>
>
> Query plans for the failing stage:
>
> #####################################################################
>
>
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#265L]
> +- Project [id#0L, label#183, features#208]
>    +- Project [id#0L, label#183, num_feat#202, features#208]
>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
> num_feat#202]
>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>                +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>                      +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>                         +- MetastoreRelation pmccarthy, all_data_long
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#265L]
> +- Project [id#0L, label#183, features#208]
>    +- Project [id#0L, label#183, num_feat#202, features#208]
>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
> num_feat#202]
>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>                +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>                      +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>                         +- MetastoreRelation pmccarthy, all_data_long
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#265L]
> +- Project
>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
> StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>                +- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
> FIRST], false, 0
>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>                         +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
> label_val#99]
>                                  +- InMemoryRelation [id#0L, label_val#99,
> segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>                                        +- *Project [id#0L, label#1 AS
> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>                                           +- HiveTableScan [id#0L,
> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
> +- Exchange SinglePartition
>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#284L])
>       +- InMemoryTableScan
>             +- InMemoryRelation [id#0L, label#183, features#208], true,
> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
> features#208]
>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>                         +- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>                            +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
> NULLS FIRST], false, 0
>                               +- Exchange hashpartitioning(id#0L,
> label#183, 200)
>                                  +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                                     +- InMemoryTableScan [id#0L,
> indexedSegs#93, label_val#99]
>                                           +- InMemoryRelation [id#0L,
> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>                                                 +- *Project [id#0L,
> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>                                                    +- HiveTableScan
> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
>
> --
Best Regards,
Ayan Guha