You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vedant Dhandhania <ve...@retentionscience.com> on 2014/07/18 23:56:22 UTC

Broadcasting a set in PySpark

Hi All,

I am trying to broadcast a set in a PySpark script.

I create the set like this:

Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())


Then execute this line:


uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))


 An error occurred while calling o104.collectPartitions.

: org.apache.spark.SparkException: Job aborted due to stage failure:
Serialized task 1131:0 was 23503247 bytes which exceeds
spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
for large values.



So I tried broadcasting it:

Uid_male_setbc = sc.broadcast(Uid_male_set)


>>> Uid_male_setbc

<pyspark.broadcast.Broadcast object at 0x1ba2ed0>


Then I execute it line:


uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))

ile "<stdin>", line 1, in <lambda>

TypeError: argument of type 'Broadcast' is not iterable

 [duplicate 1]


So I am stuck either ways, the script runs locally well on a smaller
dataset, but throws me this error. Could any one point out how to correct
this or where I am going wrong?

Thanks


*Vedant Dhandhania*

*Retention** Science*

call: 805.574.0873

visit: Site <http://www.retentionscience.com/> | like: Facebook
<http://www.facebook.com/RetentionScience> | follow: Twitter
<http://twitter.com/RetentionSci>

Re: Broadcasting a set in PySpark

Posted by Vedant Dhandhania <ve...@retentionscience.com>.
Hi Josh,
I did make that change, however I get this error now:

568.492: [GC [PSYoungGen: 1412948K->207017K(1465088K)]
4494287K->3471149K(4960384K), 0.1280200 secs] [Times: user=0.23 sys=0.63,
real=0.13 secs]

568.642: [Full GCTraceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "/home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py", line
708, in count

    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

  File "/home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py", line
699, in sum

    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

  File "/home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py", line
619, in reduce

    vals = self.mapPartitions(func).collect()

  File "/home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py", line
583, in collect

    bytesInJava = self._jrdd.collect().iterator()

  File "/home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py", line
94, in __exit__

    self._context._jsc.setCallSite(None)

  File
"/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 535, in __call__

  File
"/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 361, in send_command

  File
"/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 317, in _get_connection

  File
"/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 324, in _create_connection

  File
"/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 431, in start

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java server


*Vedant Dhandhania*

*Retention** Science*

call: 805.574.0873

visit: Site <http://www.retentionscience.com/> | like: Facebook
<http://www.facebook.com/RetentionScience> | follow: Twitter
<http://twitter.com/RetentionSci>


On Fri, Jul 18, 2014 at 3:10 PM, Josh Rosen <ro...@gmail.com> wrote:

> You have to use `myBroadcastVariable.value` to access the broadcasted
> value; see
> https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>
>
> On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania <
> vedant@retentionscience.com> wrote:
>
>> Hi All,
>>
>> I am trying to broadcast a set in a PySpark script.
>>
>> I create the set like this:
>>
>> Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())
>>
>>
>> Then execute this line:
>>
>>
>> uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
>> x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))
>>
>>
>>  An error occurred while calling o104.collectPartitions.
>>
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Serialized task 1131:0 was 23503247 bytes which exceeds
>> spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
>> for large values.
>>
>>
>>
>> So I tried broadcasting it:
>>
>> Uid_male_setbc = sc.broadcast(Uid_male_set)
>>
>>
>> >>> Uid_male_setbc
>>
>> <pyspark.broadcast.Broadcast object at 0x1ba2ed0>
>>
>>
>> Then I execute it line:
>>
>>
>> uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
>> x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))
>>
>> ile "<stdin>", line 1, in <lambda>
>>
>> TypeError: argument of type 'Broadcast' is not iterable
>>
>>  [duplicate 1]
>>
>>
>> So I am stuck either ways, the script runs locally well on a smaller
>> dataset, but throws me this error. Could any one point out how to correct
>> this or where I am going wrong?
>>
>> Thanks
>>
>>
>> *Vedant Dhandhania*
>>
>> *Retention** Science*
>>
>> call: 805.574.0873
>>
>> visit: Site <http://www.retentionscience.com/> | like: Facebook
>> <http://www.facebook.com/RetentionScience> | follow: Twitter
>> <http://twitter.com/RetentionSci>
>>
>
>

Re: Broadcasting a set in PySpark

Posted by Josh Rosen <ro...@gmail.com>.
You have to use `myBroadcastVariable.value` to access the broadcasted
value; see
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania <
vedant@retentionscience.com> wrote:

> Hi All,
>
> I am trying to broadcast a set in a PySpark script.
>
> I create the set like this:
>
> Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())
>
>
> Then execute this line:
>
>
> uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
> x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))
>
>
>  An error occurred while calling o104.collectPartitions.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Serialized task 1131:0 was 23503247 bytes which exceeds
> spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
> for large values.
>
>
>
> So I tried broadcasting it:
>
> Uid_male_setbc = sc.broadcast(Uid_male_set)
>
>
> >>> Uid_male_setbc
>
> <pyspark.broadcast.Broadcast object at 0x1ba2ed0>
>
>
> Then I execute it line:
>
>
> uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
> x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))
>
> ile "<stdin>", line 1, in <lambda>
>
> TypeError: argument of type 'Broadcast' is not iterable
>
>  [duplicate 1]
>
>
> So I am stuck either ways, the script runs locally well on a smaller
> dataset, but throws me this error. Could any one point out how to correct
> this or where I am going wrong?
>
> Thanks
>
>
> *Vedant Dhandhania*
>
> *Retention** Science*
>
> call: 805.574.0873
>
> visit: Site <http://www.retentionscience.com/> | like: Facebook
> <http://www.facebook.com/RetentionScience> | follow: Twitter
> <http://twitter.com/RetentionSci>
>